`
BrokenDreams
  • 浏览: 248149 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:97548
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(22)-LinkedBlockingDeque

阅读更多

Jdk1.6 JUC源码解析(22)-LinkedBlockingDeque

作者:大飞

 

功能简介:
  • LinkedBlockingDeque是一种基于双向链表实现的有界的(可选的,不指定默认int最大值)阻塞双端队列。
       双端队列一般适用于工作密取模式,即每个消费者都拥有自己的双端队列,如果某个消费者完成了自己队列的全部任务,可以到其他消费者双端队列尾部秘密获取任务来处理。
 
源码分析:
  • LinkedBlockingDeque实现了BlockingDeque接口,简单看下这个接口:

 

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
    /**
     * 将元素插入队头,如果队列满了,抛出IllegalStateException。
     */
    void addFirst(E e);
    /**
     * 将元素插入队尾,如果队列满了,抛出IllegalStateException。
     */
    void addLast(E e);
    /**
     * 将元素插入队头,如果成功,返回true;如果队列满了,返回false。
     */
    boolean offerFirst(E e);
    /**
     * 将元素插入队尾,如果成功,返回true;如果队列满了,返回false。
     */
    boolean offerLast(E e);
    /**
     * 将元素插入队头。如果队列满了,阻塞等待,直到队列有可用空间。
     */
    void putFirst(E e) throws InterruptedException;
    /**
     * 将元素插入队尾。如果队列满了,阻塞等待,直到队列有可用空间。
     */
    void putLast(E e) throws InterruptedException;
    /**
     * 将元素插入队头。如果队列满了,阻塞等待。
     * 如果插入成功,返回true;如果等待超时,返回false。
     */
    boolean offerFirst(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 将元素插入队尾。如果队列满了,阻塞等待。
     * 如果插入成功,返回true;如果等待超时,返回false。
     */
    boolean offerLast(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 获取并移除队头元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
     */
    E takeFirst() throws InterruptedException;
    /**
     * 获取并移除队尾元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
     */
    E takeLast() throws InterruptedException;
    /**
     * 获取并移除队头元素,如果队头没有元素,阻塞等待。
     * 如果过了给定的时间还不能获取元素,返回null。
     */
    E pollFirst(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 获取并移除队尾元素,如果队头没有元素,阻塞等待。
     * 如果过了给定的时间还不能获取元素,返回null。
     */
    E pollLast(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 从队列中删除第一个出现的指定元素。
     * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
     */
    boolean removeFirstOccurrence(Object o);
    /**
     * 从队列中删除最后一个出现的指定元素。
     * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
     */
    boolean removeLastOccurrence(Object o);
    // *** BlockingQueue methods ***
    /**
     * 添加一个元素到队尾,如果成功,返回true。如果队列满了,抛出IllegalStateException。
     */
    boolean add(E e);
    /**
     * 添加一个元素到队尾。添加成功,返回treue;如果队列满了,返回false。
     */
    boolean offer(E e);
    /**
     * 将元素插入队尾。如果队列满了,阻塞等待。
     */
    void put(E e) throws InterruptedException;
    /**
     * 将元素插入队尾。如果队列满了,阻塞等待。
     * 如果插入成功,返回true;如果等待超时,返回false。
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 获取并移除队头元素,如果队头没有元素,抛出NoSuchElementException异常。
     */
    E remove();
    /**
     * 获取并移除队头元素,如果队头没有元素,返回null。
     */
    E poll();
    /**
     * 获取并移除队头元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
     */
    E take() throws InterruptedException;
    /**
     * 获取并移除队头元素,如果队头没有元素,阻塞等待。
     * 如果过了给定的时间还不能获取元素,返回null。
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 查看队头元素,如果队头没有元素,抛出NoSuchElementException。
     */
    E element();
    /**
     * 查看队头元素,如果队头没有元素,返回null。
     */
    E peek();
    /**
     * 从队列中删除第一个出现的指定元素。
     * 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
     */
    boolean remove(Object o);
    /**
     * 查看队列是否包含给定的元素,包含返回true;不包含返回false。
     */
    public boolean contains(Object o);
    /**
     * 返回队列中元素数量。
     */
    public int size();
    /**
     * 返回一个从头到尾排序的迭代器。
     */
    Iterator<E> iterator();
    // *** Stack methods ***
    /**
     * 相当于addFirst(Object)
     */
    void push(E e);
}
 

 

 

  • 接下来看下LinkedBlockingDeque内部数据结构:

 

public class LinkedBlockingDeque<E>
    extends AbstractQueue<E>
    implements BlockingDeque<E>,  java.io.Serializable {
    private static final long serialVersionUID = -387911632671998426L;
    /** Doubly-linked list node class */
    static final class Node<E> {
       /**
        * 保存元素的域,如果为nul说明当前节点已被删除。
        */
	E item;
        /**
         * - 指向其前驱节点。
         * - 如果指向自身,说明前面是队尾节点。
         * - 如果为null,说明没有前驱节点。
         */
        Node<E> prev;
        /**
         * - 指向其后继节点。
         * - 如果指向自身,说明后面是队头节点。
         * - 如果为null,说明没有后继节点。
         */
        Node<E> next;
        Node(E x, Node<E> p, Node<E> n) {
            item = x;
            prev = p;
            next = n;
        }
    }
    /** 指向队头节点 */
    transient Node<E> first;
    /** 指向队尾节点 */
    transient Node<E> last;
    /** 队列中元素数量 */
    private transient int count;
    /** 队列最大容量 */
    private final int capacity;
    /** 队列中保护访问使用的锁 */
    final ReentrantLock lock = new ReentrantLock();
    /** 获取元素的等待条件(队列非空) */
    private final Condition notEmpty = lock.newCondition();
    /** 插入元素的等待条件(队列非满) */
    private final Condition notFull = lock.newCondition();
    /**
     * 不指定容量,默认为Integer.MAX_VALUE(可认为无限)。
     */
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

       内部结构:一个双向链表、一把锁和两个锁条件。

 
  • 再看下内部的一些基础操作,这些操作必须在持有锁的前提下调用:
    /**
     * Links e as first element, or returns false if full.
     */
    private boolean linkFirst(E e) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false; //如果队列已满,返回false;
        Node<E> f = first;
        //新建节点x,用来存放数据e,将e插入到队头节点前面。
        Node<E> x = new Node<E>(e, null, f);
        //然后将e设置为队头节点。 
        first = x;
        if (last == null)
            last = x; //如果没有队尾节点,那么将x设置为队尾节点。
        else
            f.prev = x; //如果有队尾节点,那么将f的prev指向x,完成节点拼接。
        ++count; // 累加当前元素计数
        notEmpty.signal(); //有元素入队了,唤醒在notEmpty上等待的获取元素的线程。
        return true;
    }

       linkFirst就是将一个元素插入到队头,并成为新的队头元素。 

 

    /**
     * Links e as last element, or returns false if full.
     */
    private boolean linkLast(E e) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false; //如果队列已满,返回false;
        Node<E> l = last;
        //新建节点x,用来存放数据e,将e插入到队尾节点后面。
        Node<E> x = new Node<E>(e, l, null);
        last = x;
        if (first == null)
            first = x; //如果没有队头节点,那么将x设置为队头节点。
        else
            l.next = x; //如果有队头节点,那么将l的next指向x,完成节点拼接。
        ++count; // 累加当前元素计数
        notEmpty.signal(); //有元素入队了,唤醒在notEmpty上等待的获取元素的线程。
        return true;
    }

       linkLast就是将一个元素插入到队尾,并成为新的队尾元素。 

 

    /**
     * Removes and returns first element, or null if empty.
     */
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first; //获取队头节点f。
        if (f == null)
            return null; //如果没有队头节点,返回null。
        Node<E> n = f.next; //获取f的后继节点。
        E item = f.item; //获取f的元素item。
        f.item = null; //将f的item域置空。
        f.next = f; // 将f的next域指向自身,帮助GC。
        first = n; //将f的后继节点n设置为新的队头节点。
        if (n == null)
            last = null; //如果n为空,说明队列为空了,把队尾节点也置空一下。
        else
            n.prev = null; //如果n不为空,现在n是队头节点,需要将其prev域置空。
        --count; //递减当前元素计数
        notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
        return item; //返回元素item。
    }

       unlinkFirst就是将现有的队头节点移除,并将其后继节点设置为新的队头节点,并返回移除的队头节点中保存的元素。 

 

    /**
     * Removes and returns last element, or null if empty.
     */
    private E unlinkLast() {
        // assert lock.isHeldByCurrentThread();
        Node<E> l = last; //获取队尾节点l。
        if (l == null)
            return null; //如果没有队尾节点,返回null。
        Node<E> p = l.prev; //获取l的前驱节点。
        E item = l.item; //获取l的元素item。
        l.item = null; //将l的item域置空。
        l.prev = l; // 将f的prev域指向自身,帮助GC。
        last = p; //将l的前驱节点p设置为新的队尾节点。
        if (p == null)
            first = null; //如果p为空,说明队列为空了,把队头节点也置空一下。
        else
            p.next = null; //如果p不为空,现在p是队尾节点,需要将其next域置空。
        --count; //递减当前元素计数
        notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
        return item; //返回元素item。
    }

      unlinkLast就是将现有的队尾节点移除,并将其前驱节点设置为新的队尾节点,并返回移除的队尾节点中保存的元素。 

 

    /**
     * Unlinks x
     */
    void unlink(Node<E> x) {
        // assert lock.isHeldByCurrentThread();
        Node<E> p = x.prev;
        Node<E> n = x.next;
        if (p == null) {
           unlinkFirst(); //如果x没有前驱节点,那么x就是队头节点,所以调用一下unlinkFirst就可以了。
        } else if (n == null) {
          unlinkLast(); //如果x没有后继节点,那么x就是队尾节点,所以调用一下unlinkLast就可以了。
        } else {
            p.next = n; //将x的前驱节点p的next指向x的后继节点n。
            n.prev = p; //将x的后继节点n的prev指向x的前驱节点n。
            x.item = null; //置空x的item域。
            // 注意上面并没有清理x本身的prev和next域,因为它们可能正在被某个迭代器使用中。
            --count; //递减当前元素计数
            notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
        }
    }

 

 

  • LinkedBlockingDeque中利用上述基础方法来实现BlockingDeque接口定义的方法,随便看几个:

 

    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(e))
                notFull.await(); //如果插入元素到队头失败,在notFull条件上等待。
        } finally {
            lock.unlock();
        }
    }

    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(e))
                notFull.await(); //如果插入元素到队尾失败,在notFull条件上等待。
        } finally {
            lock.unlock();
        }
    }
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await(); //如果从队头获取并删除元素失败,在notEmpty条件上等待。
            return x;
        } finally {
            lock.unlock();
        }
    }
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkLast()) == null)
                notEmpty.await(); //如果从队尾获取并删除元素失败,在notEmpty条件上等待。
            return x;
        } finally {
            lock.unlock();
        }
    }

 

 

  • 其他的方法也很容易看懂了,这里就不啰嗦了。最后注意LinkedBlockingDeque的迭代器是弱一致的,而且支持双向迭代器。
 
       LinkedBlockingDeque的代码解析完毕!
 
 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics