`
BrokenDreams
  • 浏览: 248974 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:97932
社区版块
存档分类
最新评论

disruptor-3.3.2源码解析(2)-队列

阅读更多

disruptor-3.3.2源码解析(2)-队列

作者:大飞

 

  • Disruptor中的队列-RingBuffer
       RingBuffer是disruptor最重要的核心组件,如果以生产者/消费者模式来看待disruptor框架的话,那RingBuffer就是生产者和消费者的工作队列了。RingBuffer可以理解为是一个环形队列,那内部是怎么实现的呢?看下源码。
 
       首先,RingBuffer实现了一系列接口,Cursored、EventSequencer和EventSink,Cursored上篇提过了,这里看下后面两个:
public interface EventSequencer<T> extends DataProvider<T>, Sequenced{
}
 
public interface DataProvider<T>{
    T get(long sequence);
}
       EventSequencer扩展了Sequenced,提供了一些序列功能;同时扩展了DataProvider,提供了按序列值来获取数据的功能。 
 
public interface EventSink<E>{
    void publishEvent(EventTranslator<E> translator);

    boolean tryPublishEvent(EventTranslator<E> translator);

    <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0);

    <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0);

    <A, B> void publishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1);

    <A, B> boolean tryPublishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1);

    <A, B, C> void publishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2);

    <A, B, C> boolean tryPublishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2);

    void publishEvent(EventTranslatorVararg<E> translator, Object... args);

    boolean tryPublishEvent(EventTranslatorVararg<E> translator, Object... args);

    void publishEvents(EventTranslator<E>[] translators);

    void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize);

    boolean tryPublishEvents(EventTranslator<E>[] translators);

    boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize);

    <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0);

    <A> void publishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0);

    <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0);

    <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0);

    <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1);

    <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1);

    <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1);

    <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1);

    <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2);

    <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2);

    <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2);

    <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2);

    void publishEvents(EventTranslatorVararg<E> translator, Object[]... args);

    void publishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args);

    boolean tryPublishEvents(EventTranslatorVararg<E> translator, Object[]... args);

    boolean tryPublishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args);
}
       可见,EventSink主要是提供发布事件(就是往队列上放数据)的功能,接口上定义了以各种姿势发布事件的方法。
 
       了解了RingBuffer的接口功能,下面看下RingBuffer的结构:
abstract class RingBufferPad{
    protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad{
    private static final int BUFFER_PAD;
    private static final long REF_ARRAY_BASE;
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();
    static{
        final int scale = UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale){
            REF_ELEMENT_SHIFT = 2;
        } else if (8 == scale){
            REF_ELEMENT_SHIFT = 3;
        }else{
            throw new IllegalStateException("Unknown pointer size");
        }
        BUFFER_PAD = 128 / scale;
        // Including the buffer pad in the array base offset
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
    }
    private final long indexMask;
    private final Object[] entries;
    protected final int bufferSize;
    protected final Sequencer sequencer;
    RingBufferFields(EventFactory<E> eventFactory,
                     Sequencer       sequencer){
        this.sequencer  = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1){
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1){
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = bufferSize - 1;
        this.entries   = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }
    private void fill(EventFactory<E> eventFactory){
        for (int i = 0; i < bufferSize; i++){
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    @SuppressWarnings("unchecked")
    protected final E elementAt(long sequence){
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer){
        super(eventFactory, sequencer);
    }
       RingBuffer的内部结构明确了:内部用数组来实现,同时有保存数组长度的域bufferSize和下标掩码indexMask,还有一个sequencer。
       这里要注意几点:
              1.整个RingBuffer内部做了大量的缓存行填充,前后各填充了56个字节,entries本身也根据引用大小进行了填充,假设引用大小为4字节,那么entries数组两侧就要个填充32个空数组位。也就是说,实际的数组长度比bufferSize要大。所以可以看到根据序列从entries中取元素的方法elementAt内部做了一些调整,不是单纯的取模。
              2.bufferSize必须是2的幂,indexMask就是bufferSize-1,这样取模更高效(sequence&indexMask)。
              3.初始化时需要传入一个EventFactory,用来做队列内事件的预填充。
 
       总结下RingBuffer的特点:内部做了细致的缓存行填充,避免伪共享;内部队列基于数组实现,能很好的利用程序的局部性;队列上的事件槽会不断重复利用,不受JavaGC的影响。
 
       从上面的代码看到,RingBuffer对本包外屏蔽了构造方法,那么怎创建RingBuffer实例呢? 
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory,
                                                        int             bufferSize,
                                                        WaitStrategy    waitStrategy){
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer<E>(factory, sequencer);
    }

    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize){
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory,
                                                         int             bufferSize,
                                                         WaitStrategy    waitStrategy){
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer<E>(factory, sequencer);
    }

    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize){
        return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    public static <E> RingBuffer<E> create(ProducerType    producerType,
                                           EventFactory<E> factory,
                                           int             bufferSize,
                                           WaitStrategy    waitStrategy){
        switch (producerType){
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
        }
    }
       可见,RingBuffer提供了静态工厂方法分别针对单事件发布者和多事件发布者的情况进行RingBuffer实例创建。
 
       RingBuffer方法实现也比较简单,看几个:
    @Override
    public E get(long sequence){
        return elementAt(sequence);
    }
       事件发布者和事件处理者申请到序列后,都会通过这个方法从序列上获取事件槽来生产或者发布事件。
 
    @Override
    public long next(int n){
        return sequencer.next(n);
    }
       next系列的方法是通过内部的sequencer来实现的。   
 
    public boolean isPublished(long sequence){
        return sequencer.isAvailable(sequence);
    }
       判断某个序列是否已经被事件发布者发布事件。  
 
    public void addGatingSequences(Sequence... gatingSequences){
        sequencer.addGatingSequences(gatingSequences);
    }

    public long getMinimumGatingSequence(){
        return sequencer.getMinimumSequence();
    }

    public boolean removeGatingSequence(Sequence sequence){
        return sequencer.removeGatingSequence(sequence);
    }
       追踪序列相关的方法。 
 
    @Override
    public void publishEvent(EventTranslator<E> translator){
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence);
    }
    private void translateAndPublish(EventTranslator<E> translator, long sequence){
        try{
            translator.translateTo(get(sequence), sequence);
        }finally{
            sequencer.publish(sequence);
        }
    }
       可见,发布事件分三步:
              1.申请序列。
              2.填充事件。
              3.提交序列。
 
       其他方法实现都很简单,这里不啰嗦了。
 
  • 最后总结:
       有关RingBuffer要记住以下几点:
              1.RingBuffer是协调事件发布者和事件处理者的中间队列,事件发布者发布事件放到RingBuffer,事件处理者从RingBuffer上拿事件进行消费。
              2.RingBuffer可以认为是一个环形队列,底层由数组实现。内部做了大量的缓存行填充,保存事件使用的数组的长度必须是2的幂,这样可以高效的取模(取模本身就包含绕回逻辑,按照序列不断的增长,形成一个环形轨迹)。由于RingBuffer这样的特性,也避免了GC带来的性能影响,因为RingBuffer本身永远不会被GC。
              3.RingBuffer和普通的FIFO队列相比还有一个重要的区别就是,RingBuffer避免了头尾节点的竞争,多个事件发布者/事件处理者之间不必竞争同一个节点,只需要安全申请序列值各自存取事件就好了。 
 
分享到:
评论
1 楼 746238836 2019-01-25  
整个RingBuffer内部做了大量的缓存行填充,前后各填充了56个字节,entries本身也根据引用大小进行了填充,假设引用大小为4字节,那么entries数组两侧就要个填充32个空数组位。  这一段的因果关系没理解,为什么前后填充了56个字节,引用大小为4字节,,,这时候entries数组两侧就要各填充32个空数组位,这个因果关系没理解,望告知!!!

相关推荐

Global site tag (gtag.js) - Google Analytics