`
BrokenDreams
  • 浏览: 248954 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:97928
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(18)-DelayQueue

阅读更多

Jdk1.6 JUC源码解析(18)-DelayQueue

作者:大飞

 

功能简介:
  • DelayQueue是一种无界的阻塞队列,队列里只允许放入可以"延期"的元素,队列中列头的元素是最先"到期"的元素。如果队列中没有任何元素"到期",尽管队列中有元素,也不能从队列头获取到任何元素。
源码分析:
  • 首先还是看一下内部数据结构:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    private transient final ReentrantLock lock = new ReentrantLock();
    private transient final Condition available = lock.newCondition();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

       内部结构非常简单,一把锁,一个条件,一个优先队列。

 
       DelayQueue要求放入其中的元素必须实现Delayed接口,看下这个接口:
/**
 * A mix-in style interface for marking objects that should be
 * acted upon after a given delay.
 *
 * <p>An implementation of this interface must define a
 * <tt>compareTo</tt> method that provides an ordering consistent with
 * its <tt>getDelay</tt> method.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Delayed extends Comparable<Delayed> {
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}
       这个接口定义了一个返回延时值的方法,而且扩展了Comparable接口,具体实现的排序方式会和延时值有关,延时值最小的会排在前面。再结合上面DelayQueue的内部数据结构,我们就可以大概脑补这个过程了。
 
  • 既然是阻塞队列,还是从put和take方法开始入手,先看下put方法:
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //获取队头元素。
            E first = q.peek();
            //将元素放入内部队列。
            q.offer(e);
            if (first == null || e.compareTo(first) < 0)
                available.signalAll(); //如果队头没有元素 或者 当前元素比队头元素的延时值小,那么唤醒available条件上的线程。
            return true;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 插入一个元素到延迟队列,由于队列是无界的,所以这个方法永远不会阻塞。
     *
     * @param e the element to add
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) {
        offer(e);
    }

 

       再看下take方法: 

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //获取队头元素。
                E first = q.peek();
                if (first == null) {
                    available.await();//如果队头没有元素,那么当前线程在available条件上等待。
                } else {
                    //如果队头有元素,获取元素的延时值。
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        //如果延时值大于0,那么等待一下。
                        long tl = available.awaitNanos(delay); 
                    } else {
                        //否则获取并移除队列列头元素。
                        E x = q.poll();
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); // 如果内部队列中还有元素,那么唤醒其他在available条件上等待着的take线程。
                        return x;
                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }

 

  • 其他方法代码也很简单,这里不做分析了。最后注意下本类的迭代器是弱一致的,并且不保证元素的特定顺序。
 
       DelayQueue的代码解析完毕!
 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics