`
BrokenDreams
  • 浏览: 248952 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:97922
社区版块
存档分类
最新评论

Jdk1.6 JUC源码解析(15)-SynchronousQueue

阅读更多

Jdk1.6 JUC源码解析(15)-SynchronousQueue

作者:大飞

 

功能简介:
  • SynchronousQueue是一种特殊的阻塞队列,它本身没有容量,只有当一个线程从队列取数据的同时,另一个线程才能放一个数据到队列中,反之亦然。存取过程相当于一个线程把数据(安全的)交给另一个线程的过程。
  • SynchronousQueue也支持公平和非公平模式。
源码分析:
  • SynchronousQueue内部采用伪栈和伪队列来实现,分别对应非公平模式和公平模式。先看下这部分实现。
       伪栈和伪队列的公共基类:
    static abstract class Transferer {
        /**
         * 转移数据的方法,用来实现put或者take。
         *
         * @param e 如果不为null,相当于将一个数据交给消费者;
         *          如果为null,相当于从一个生产者接收一个消费者交出的数据。
         * @param timed 操作是否支持超时。
         * @param nanos 超时时间,单位纳秒。
         * @return if non-null, the item provided or received; if null,
         *         the operation failed due to timeout or interrupt --
         *         the caller can distinguish which of these occurred
         *         by checking Thread.interrupted.
         */
        abstract Object transfer(Object e, boolean timed, long nanos);
    }
 

       先看下伪栈实现,内部结构如下:

    static final class TransferStack extends Transferer {
  
        /** 表示一个没有得到数据的消费者 */
        static final int REQUEST    = 0;
        /** 表示一个没有交出数据的生产者 */
        static final int DATA       = 1;
        /**
         * 表示正在匹配另一个生产者或者消费者。
         */
        static final int FULFILLING = 2;
        /** 判断是否包含正在匹配(FULFILLING)的标记 */
        static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
        /** Node class for TransferStacks. */
        static final class SNode {
            volatile SNode next;        // 栈中的下一个节点
            volatile SNode match;       // 和当前节点完成匹配的节点
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;
            // Note: item和mode不需要volatile修饰, 
            // 是因为它们在其他的volatile/atomic操作之前写,之后读。
            SNode(Object item) {
                this.item = item;
            }
            static final AtomicReferenceFieldUpdater<SNode, SNode>
                nextUpdater = AtomicReferenceFieldUpdater.newUpdater
                (SNode.class, SNode.class, "next");
            boolean casNext(SNode cmp, SNode val) {
                return (cmp == next &&
                        nextUpdater.compareAndSet(this, cmp, val));
            }
            static final AtomicReferenceFieldUpdater<SNode, SNode>
                matchUpdater = AtomicReferenceFieldUpdater.newUpdater
                (SNode.class, SNode.class, "match");
            /**
             * 尝试匹配节点s和当前节点,如果匹配成功,唤醒等待线程。
             * (向消费者传递数据或向生产者获取数据)调用tryMatch方法 
             * 来确定它们的等待线程,然后唤醒这个等待线程。
             *
             * @param s the node to match
             * @return true if successfully matched to s
             */
            boolean tryMatch(SNode s) {
                if (match == null &&
                    matchUpdater.compareAndSet(this, null, s)) {
                    //如果当前节点的match为空,那么CAS设置s为match,然后唤醒waiter。
                    Thread w = waiter;
                    if (w != null) {    // waiters need at most one unpark
                        waiter = null;
                        LockSupport.unpark(w);
                    }
                    return true;
                }
                //如果match不为null,或者CAS设置match失败,那么比较match和s是否为相同对象。
                //如果相同,说明已经完成匹配,匹配成功。
                return match == s;
            }
            /**
             * 尝试取消当前节点(有线程等待),通过将match设置为自身。
             */
            void tryCancel() {
                matchUpdater.compareAndSet(this, null, this);
            }
            boolean isCancelled() {
                return match == this;
            }
        }
        /** The head (top) of the stack */
        volatile SNode head;
        static final AtomicReferenceFieldUpdater<TransferStack, SNode>
            headUpdater = AtomicReferenceFieldUpdater.newUpdater
            (TransferStack.class,  SNode.class, "head");

 

 

       下面看下伪栈中transfer方法实现细节吧:

        /**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /*
             * 基本算法是在一个无限循环中尝试下面三种情况里面的一种:
             *
             * 1. 如果当前栈为空或者包含与给定节点模式相同的节点,尝试 
             *    将节点压入栈内,并等待一个匹配节点,最后返回匹配节点 
             *    或者null(如果被取消)。
             *
             * 2. 如果当前栈包含于给定节点模式互补的节点,尝试将这个节 
             *    点打上FULFILLING标记,然后压入栈中,和相应的节点进行 
             *    匹配,然后将两个节点(当前节点和互补节点)弹出栈,并返 
             *    回匹配节点的数据。匹配和删除动作不是必须要做的,因为 
             *    其他线程会执行动作3:
             *
             * 3. 如果栈顶已经存在一个FULFILLING(正在满足其他节点)的节 
             *    点,帮助这个节点完成匹配和移除(出栈)的操作。然后继续 
             *    执行(主循环)。这部分代码基本和动作2的代码一样,只是 
             *    不会返回节点的数据。
             */
            SNode s = null; // constructed/reused as needed
            int mode = (e == null)? REQUEST : DATA;
            for (;;) {
                SNode h = head;
                if (h == null || h.mode == mode) {  // head为null或者head和e的mode相同。
                    if (timed && nanos <= 0) {      // 如果超时
                        if (h != null && h.isCancelled())
                            casHead(h, h.next);     // 如果h不为null且被取消,弹出h。
                        else
                            return null;            // 否则返回null。
                    } else if (casHead(h, s = snode(s, e, h, mode))) {//创建一个SNode,赋给s,将原本的head节点做为其next节点,并尝试将其设置为新的head。
                        //等待其他线程来满足当前线程。
                        SNode m = awaitFulfill(s, timed, nanos);
                        if (m == s) {               // awaitFulfill方法返回后,判断下是否被取消。
                            clean(s);               // 如果取消,清理一下s节点。
                            return null;
                        }
                        if ((h = head) != null && h.next == s) //因为上面已经将s设置为head,如果满足这个条件说明有其他节点t插入到s前面,变成了head,而且这个t就是和s匹配的节点,他们已经完成匹配。
                            casHead(h, s.next);     // 将s的next节点设置为head。相当于把s和t一起移除了。
                        return mode == REQUEST? m.item : s.item;
                    }
                } else if (!isFulfilling(h.mode)) { 
                    /*
                     * 如果栈中存在头节点,且和当前节点不是相同模式,
                     * 那么说明它们是一对儿对等的节点,尝试用当前节
                     * 点s来满足h节点。
                     */
                    if (h.isCancelled())            // 如果h节点已经被取消
                        casHead(h, h.next);         // 将h节点弹出,并将h节点的next节点设置为栈的head。
                    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//尝试将当前节点打上"正在匹配"的标记,并设置为head。
                        for (;;) {
                            SNode m = s.next;       // s是当前节点,m是s的next节点,它们是正在匹配的两个节点。
                            if (m == null) {        // 如果m为空,可能其他节点把m匹配走了。
                                casHead(s, null);   // 将s弹出
                                s = null;           // 将s置空,下轮循环的时候还会新建。
                                break;              // 回退到主循环再来一次。
                            }
                            SNode mn = m.next;      // 获取m的next节点,如果s和m匹配成功,mn就得补上head的位置了。
                            if (m.tryMatch(s)) {    // 尝试匹配一下,匹配成功的话会把m上等待的线程唤醒。
                                casHead(s, mn);     // 如果匹配成功,把s和m弹出。
                                return (mode == REQUEST)? m.item : s.item;
                            } else                  // 没匹配成功的话,说明m可能被其他节点满足了。
                                s.casNext(m, mn);   // 说明m已经被其他节点匹配了,那就把m移除掉。
                        }
                    }
                } else {                            // 到这儿的话,说明栈顶的h正在匹配过程中。
                    SNode m = h.next;               // m是h的配对儿,h正在和m匹配。
                    if (m == null)                  // 如果m为空,其他节点把m匹配走了。
                        casHead(h, null);           // 弹出h。
                    else {
                        SNode mn = m.next;          // 获取m的next节点,如果m和h匹配成功,mn就得补上head的位置了。
                        if (m.tryMatch(h))          // 帮忙匹配一下m和h。
                            casHead(h, mn);         // 匹配成功的话,把h和m弹出。
                        else                        // 没匹配成功的话,说明m可能被其他节点满足了。
                            h.casNext(m, mn);       // 没成功的话,说明m已经被其他节点匹配了,那就把m移除掉。
                    }
                }
            }
        }

 

     看下上面方法中调用的创建节点的snode方法:

        static SNode snode(SNode s, Object e, SNode next, int mode) {
            if (s == null) s = new SNode(e);
            s.mode = mode;
            s.next = next;
            return s;
        }

 

     再看下等待被匹配的方法:

        /**
         * 自旋/阻塞直到节点被匹配。
         *
         * @param s the waiting node
         * @param timed true if timed wait
         * @param nanos timeout value
         * @return matched node, or s if cancelled
         */
        SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            /*
             * 在s节点真正阻塞之前,将当前线程设置到s上面,然后检
             * 查中断状态(不少于一次),以确保后续和s匹配的节点来
             * 唤醒当前线程。
             *
             * 当执行此方法时,如果执行节点恰好在栈顶,阻塞之前会 
             * 做一些自旋,为的是如果有生产者或消费者马上到来,就 
             * 不需要执行节点阻塞了。这种优化在多核下是有意义的。
             */
            long lastTime = (timed)? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            SNode h = head;
            int spins = (shouldSpin(s)?
                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel();//如果当前线程被中断了,那么取消当前节点。
                SNode m = s.match;
                if (m != null)
                    return m; //如果已经匹配成功,就返回匹配的节点。
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0) {
                        s.tryCancel(); //如果超时了,也取消当前节点。
                        continue;
                    }
                }
                if (spins > 0)
                    spins = shouldSpin(s)? (spins-1) : 0; //自旋控制,每次循环都检测是否满足自旋条件,满足的话,自旋值就减去1,然后进入下次循环(一直减到0)
                else if (s.waiter == null)
                    s.waiter = w; //第一次循环时,会将当前线程设置到s上。
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos); //有超时条件下,会检测超时时间是否大于超时阀值(这应该是一个经验值),大于就阻塞,小于就自旋。
            }
        }
        /**
         * 如果s节点就是当前栈中头节点,或者头节点正在匹配过程中,那么可以自旋一下。
         */
        boolean shouldSpin(SNode s) {
            SNode h = head;
            return (h == s || h == null || isFulfilling(h.mode));
        }
    
    //=========下面是自旋相关参数,定义在SynchronousQueue类中============================================
    /** The number of CPUs, for spin control */
    static final int NCPUS = Runtime.getRuntime().availableProcessors();
    /**
     * The number of times to spin before blocking in timed waits.
     * The value is empirically derived -- it works well across a
     * variety of processors and OSes. Empirically, the best value
     * seems not to vary with number of CPUs (beyond 2) so is just
     * a constant.
     */
    static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
    /**
     * The number of times to spin before blocking in untimed waits.
     * This is greater than timed value because untimed waits spin
     * faster since they don't need to check times on each spin.
     */
    static final int maxUntimedSpins = maxTimedSpins * 16;
    /**
     * The number of nanoseconds for which it is faster to spin
     * rather than to use timed park. A rough estimate suffices.
     */
    static final long spinForTimeoutThreshold = 1000L;  

 

     最后看清理节点方法:

        /**
         * 当s节点被取消时,才会调用这个方法。
         */
        void clean(SNode s) {
            s.item = null;   // forget item
            s.waiter = null; // forget thread
            /*
             * At worst we may need to traverse entire stack to unlink
             * s. If there are multiple concurrent calls to clean, we
             * might not see s if another thread has already removed
             * it. But we can stop when we see any node known to
             * follow s. We use s.next unless it too is cancelled, in
             * which case we try the node one past. We don't check any
             * further because we don't want to doubly traverse just to
             * find sentinel.
             */
            SNode past = s.next;
            if (past != null && past.isCancelled())
                past = past.next; 
            // 将从栈顶节点开始到past的连续的取消节点移除。
            SNode p;
            while ((p = head) != null && p != past && p.isCancelled())
                casHead(p, p.next);
            // 如果p本身未取消(上面的while碰到一个未取消的节点就会退出,但这个节点和past节点之间可能还有取消节点),再把p到past之间的取消节点都移除。
            while (p != null && p != past) {
                SNode n = p.next;
                if (n != null && n.isCancelled())
                    p.casNext(n, n.next);
                else
                    p = n;
            }
        }
    }

   

 
       再看下伪队列实现,内部结构如下:
    static final class TransferQueue extends Transferer {
    
        /** Node class for TransferQueue. */
        static final class QNode {
            volatile QNode next;          // next node in queue
            volatile Object item;         // CAS'ed to or from null
            volatile Thread waiter;       // to control park/unpark
            final boolean isData;
            QNode(Object item, boolean isData) {
                this.item = item;
                this.isData = isData;
            }
            ...
            /**
             * 尝试取消节点。
             * 取消就是将节点的item域指向自身。
             */
            void tryCancel(Object cmp) {
                itemUpdater.compareAndSet(this, cmp, this);
            }
            boolean isCancelled() {
                return item == this;
            }
            /**
             * 判断节点是否离开了队列。
             */
            boolean isOffList() {
                return next == this;
            }
        }
        /** 队列头节点 */
        transient volatile QNode head;
        /** 队列尾节点 */
        transient volatile QNode tail;
        /**
         * 指向一个被取消的节点,如果取消这个节点时,它是最后一个进入队列的节点,
         * 那么这个节点可能还没有离开队列。
         */
        transient volatile QNode cleanMe;
        TransferQueue() {
            QNode h = new QNode(null, false); // 初始化一个哨兵节点。
            head = h;
            tail = h;
        }
 

       看下伪队列中transfer方法实现:

        /**
         * Puts or takes an item.
         */
        Object transfer(Object e, boolean timed, long nanos) {
            /* 
             * 基本算法是在一个无限循环中尝试下面两种动作里面的一种:
             *
             * 1. 如果队列为空,或者包含相同模式(存或者取)的节点。
             *    尝试将节点加入等待的队列,直到被匹配(或被取消),
             *    同时返回匹配节点的数据。
             *
             * 2. 如果队列中包含等待的节点,并且当前节点和这个等待
             *    节点能相互匹配,那么尝试匹配等待节点并将这个节点 
             *    出队,然后返回匹配节点的数据。
             *
             * 在每个动作里面,都会检测并帮助其他线程来完成节点推进。
             *
             * 在循环开始的时候会做一个非空检测,以避免当前线程看到 
             * 未初始化的头尾节点。这种情况在当前SynchronousQueue中 
             * 永远不会发生,但如果调用者持有一个非volatile/final域 
             * 的话,就有可能会发生。在循环开始的时间做这个非空检测 
             * 要比在内部(分支里)做性能好一些。
             */
            QNode s = null; // constructed/reused as needed
            boolean isData = (e != null);
            for (;;) {
                QNode t = tail;
                QNode h = head;
                if (t == null || h == null)         // 如果看到未初始化的头尾节点
                    continue;                       
                if (h == t || t.isData == isData) { // 队列为空或者当前节点和队列中节点模式相同。
                    QNode tn = t.next;
                    if (t != tail)                  // 读取到不一致的结果,说明同时有其他线程修改了tail。
                        continue;                   
                    if (tn != null) {               // 说明其他线程已经添加了新节点tn,但还没将其设置为tail。
                        advanceTail(t, tn);         // 当前线程帮忙推进尾节点,就是尝试将tn设置为尾节点。
                        continue;                   
                    }
                    if (timed && nanos <= 0)        // 超时。
                        return null;
                    if (s == null)
                        s = new QNode(e, isData);   // 初始化s。
                    if (!t.casNext(null, s))        // 尝试将当前节点s拼接到t后面。
                        continue;                   // 不成功就继续下次循环
                    advanceTail(t, s);              // 尝试将s设置为队列尾节点。
                    Object x = awaitFulfill(s, e, timed, nanos); // 然后等着被匹配。
                    if (x == s) {                   // 如果被取消。
                        clean(t, s);                // 清理s节点。
                        return null;
                    }
                    if (!s.isOffList()) {           // 如果s节点还没有离开队列。
                        advanceHead(t, s);          // 尝试将s设置为头节点,移除t。
                        if (x != null)              
                            s.item = s;
                        s.waiter = null;
                    }
                    return (x != null)? x : e;
                } else {                            // 模式正好互补。
                    QNode m = h.next;               // 找到能匹配的节点。
                    if (t != tail || m == null || h != head)
                        continue;                   // 读取到不一致的结果,进入下一轮循环。
                    Object x = m.item;
                    if (isData == (x != null) ||    // 如果m已经被匹配了。
                        x == m ||                   // 或者m被取消了。
                        !m.casItem(x, e)) {         // 如果尝试将数据e设置到m上失败。
                        advanceHead(h, m);          // 将h出队,m设置为头结点,然后重试。
                        continue;
                    }
                    advanceHead(h, m);              // 成功匹配,推进头节点。
                    LockSupport.unpark(m.waiter);   // 唤醒m上的等待线程。
                    return (x != null)? x : e;
                }
            }
        }

 

       看下transfer方法中调用的advanceHead和advanceTail方法:

        /**
         * Tries to cas nh as new head; if successful, unlink
         * old head's next node to avoid garbage retention.
         */
        void advanceHead(QNode h, QNode nh) {
            if (h == head && headUpdater.compareAndSet(this, h, nh))
                h.next = h; // forget old next
        }
        /**
         * Tries to cas nt as new tail.
         */
        void advanceTail(QNode t, QNode nt) {
            if (tail == t)
                tailUpdater.compareAndSet(this, t, nt);
        }

 

       再看下transfer方法中调用的awaitFulfill方法:

        /**
         * Spins/blocks until node s is fulfilled.
         *
         * @param s the waiting node
         * @param e the comparison value for checking match
         * @param timed true if timed wait
         * @param nanos timeout value
         * @return matched item, or s if cancelled
         */
        Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
            /* Same idea as TransferStack.awaitFulfill */
            long lastTime = (timed)? System.nanoTime() : 0;
            Thread w = Thread.currentThread();
            int spins = ((head.next == s) ?
                         (timed? maxTimedSpins : maxUntimedSpins) : 0);
            for (;;) {
                if (w.isInterrupted())
                    s.tryCancel(e);
                Object x = s.item;
                if (x != e)
                    return x;
                if (timed) {
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0) {
                        s.tryCancel(e);
                        continue;
                    }
                }
                if (spins > 0)
                    --spins;
                else if (s.waiter == null)
                    s.waiter = w;
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

 

       TransferStack的awaitFulfill方法思路差不多,代码就不做解析了。再看下transfer中调用的clean方法:

        void clean(QNode pred, QNode s) {
            s.waiter = null; // forget thread
            /*
             * 在任意给定的时间点,能删除的节点一定是最后入队的节点。
             * 为了满足这个条件,如果当前无法删除s,就将其前驱节点保 
             * 存为"cleanMe",先删除之前保存的版本。至少节点s和之前 
             * 保存的节点里面有一个能被删除,所以方法一定会结束。
             */
            while (pred.next == s) { // Return early if already unlinked
                QNode h = head;
                QNode hn = h.next;   
                if (hn != null && hn.isCancelled()) {
                    advanceHead(h, hn); //如果head节点的next节点被取消,那么推进一下head节点。
                    continue;
                }
		        QNode t = tail;      // Ensure consistent read for tail
                if (t == h)          // 如果队列为空,
                    return;
		        QNode tn = t.next;    
		        if (t != tail)       // 出现不一致读,重试。
                    continue;
                if (tn != null) {
                    advanceTail(t, tn); // 帮助推进尾节点。
                    continue;
                }
                if (s != t) {        // 如果s不是尾节点,移除s。
                    QNode sn = s.next;
                    if (sn == s || pred.casNext(s, sn)) //如果s已经被移除退出循环,否则尝试断开s
                        return;
                }
                /*
                 * 下面要做的事情大体就是:如果s是位节点,那么不会马上删除s,
                 * 而是将s的前驱节点设置为cleanMe,下次清理其他取消节点的时候
                 * 会顺便把s移除。
                 */
                QNode dp = cleanMe;
                if (dp != null) {    // 如果dp不为null,说明是前一个被取消节点,将其移除。
                    QNode d = dp.next;
                    QNode dn;
                    if (d == null ||               // d is gone or
                        d == dp ||                 // d is off list or
                        !d.isCancelled() ||        // d not cancelled or
                        (d != t &&                 // d not tail and
                         (dn = d.next) != null &&  //   has successor
                         dn != d &&                //   that is on list
                         dp.casNext(d, dn)))       //  把之前标记为cleanMe节点的next节点d移除。
                        casCleanMe(dp, null);      
                    if (dp == pred)
                        return;      // 说明s的前驱已经是cleanMe了(后续会被删掉)。
                 } else if (casCleanMe(null, pred))
                    return;          // 如果当前cleanMe为null,那么将s前驱节点设置为cleanMe,并退出。
              }
           }

         

       小总结一下:
       从上面的分析可以看出,伪栈的结构下,新来的线程会作为栈顶节点或者优先和栈顶的等待节点进行匹配,并不是公平的;但伪队列的结构下,新来的线程会在队尾,或者和队头的等待节点(最前到的)进行匹配,能够保证一定的公平性。
 
  • 有了内部伪栈和伪队列的实现,SynchronousQueue实现起来就很容易了,看下代码:
    private transient volatile Transferer transferer;

    public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        transferer = (fair)? new TransferQueue() : new TransferStack();
    }
    /**
     * 添加一个数据到队列,等到其他线程接收这个数据。
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E o) throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, false, 0) == null) {
	        Thread.interrupted();
            throw new InterruptedException();
	    }
    }
    /**
     * 添加一个数据到队列,等到其他线程接收这个数据或者超时。
     *
     * @return <tt>true</tt> if successful, or <tt>false</tt> if the
     *         specified waiting time elapses before a consumer appears.
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public boolean offer(E o, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (o == null) throw new NullPointerException();
        if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
            return true;
        if (!Thread.interrupted())
            return false;
        throw new InterruptedException();
    }
    /**
     * 添加一个数据到队列,如果有其他线程正等待接收这个数据且接收成功,返回true;否则返回false。
     * 
     * 这个方法不阻塞。
     * @param e the element to add
     * @return <tt>true</tt> if the element was added to this queue, else
     *         <tt>false</tt>
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        return transferer.transfer(e, true, 0) != null;
    }
    /**
     * 获取并移除队列前端的数据,如果队列中没有数据,就等待其他线程添加一个数据。
     *
     * @return the head of this queue
     * @throws InterruptedException {@inheritDoc}
     */
    public E take() throws InterruptedException {
        Object e = transferer.transfer(null, false, 0);
        if (e != null)
            return (E)e;
	Thread.interrupted();
        throw new InterruptedException();
    }
    /**
     * 获取并移除队列前端的数据,如果队列中没有数据,就等待其他线程添加一个数据或者超时。
     *
     * @return the head of this queue, or <tt>null</tt> if the
     *         specified waiting time elapses before an element is present.
     * @throws InterruptedException {@inheritDoc}
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        Object e = transferer.transfer(null, true, unit.toNanos(timeout));
        if (e != null || !Thread.interrupted())
            return (E)e;
        throw new InterruptedException();
    }
    /**
     * 如果其他线程正在添加数据到队列,那么尝试获取并移除这个数据。
     * 
     * 这个方法不阻塞。
     * @return the head of this queue, or <tt>null</tt> if no
     *         element is available.
     */
    public E poll() {
        return (E)transferer.transfer(null, true, 0);
    }

 

       由于SynchronousQueue没有实际的容量,所以其他方法实现起来很简单了:

    public boolean isEmpty() {
        return true;
    }

    public int size() {
        return 0;
    }

    public int remainingCapacity() {
        return 0;
    }
 
    public void clear() {
    }

    public boolean contains(Object o) {
        return false;
    }

    public boolean remove(Object o) {
        return false;
    }

    public boolean containsAll(Collection<?> c) {
        return c.isEmpty();
    }

    public boolean removeAll(Collection<?> c) {
        return false;
    }

    public boolean retainAll(Collection<?> c) {
        return false;
    }
    ...

 

  • 最后看一下SynchronousQueue的序列化,序列化比较特别,因为transferer域本身不需要序列化,但需要记住transferer是内部伪栈和伪队列:
    static class WaitQueue implements java.io.Serializable { }
    static class LifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3633113410248163686L;
    }
    static class FifoWaitQueue extends WaitQueue {
        private static final long serialVersionUID = -3623113410248163686L;
    }
    private ReentrantLock qlock;
    private WaitQueue waitingProducers;
    private WaitQueue waitingConsumers;
    /**
     * Save the state to a stream (that is, serialize it).
     *
     * @param s the stream
     */
    private void writeObject(java.io.ObjectOutputStream s)
        throws java.io.IOException {
        //序列化时根据TransferQueue类型来创建WaitQueue实例。
        boolean fair = transferer instanceof TransferQueue;
        if (fair) {
            qlock = new ReentrantLock(true);
            waitingProducers = new FifoWaitQueue();
            waitingConsumers = new FifoWaitQueue();
        }
        else {
            qlock = new ReentrantLock();
            waitingProducers = new LifoWaitQueue();
            waitingConsumers = new LifoWaitQueue();
        }
        s.defaultWriteObject();
    }
    private void readObject(final java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        if (waitingProducers instanceof FifoWaitQueue)
            transferer = new TransferQueue();
        else
            transferer = new TransferStack();
    }

 

 
       SynchronousQueue的代码解析完毕!
 
 
 
 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics