MyDisruptorV3版本demo解析v3消费者实现解析功能

MyDisruptor V3 版本介绍

MyDisruptor v2 版本之后实现了多消费者和跨消费者组的依赖。按照计划,MyDisruptor v3 版本需要支持多线程消费者的功能。

由于本文是系列博客的一部分,所以有必要了解之前博客的内容,然后才能更好地理解本博客

MyDisruptor 支持多线程消费者 WorkerPool 实现解析

/**
 * 多线程消费者(仿Disruptor.WorkerPool)
 * */
public class MyWorkerPool {
    private final MySequence workSequence = new MySequence(-1);
    private final MyRingBuffer myRingBuffer;
    private final List<MyWorkProcessor> workEventProcessorList;
    public MyWorkerPool(
            MyRingBuffer myRingBuffer,
            MySequenceBarrier mySequenceBarrier,
            MyWorkHandler... myWorkHandlerList) {
        this.myRingBuffer = myRingBuffer;
        final int numWorkers = myWorkHandlerList.length;
        this.workEventProcessorList = new ArrayList(numWorkers);
        // 为每个自定义事件消费逻辑MyEventHandler,创建一个对应的MyWorkProcessor去处理
        for (MyWorkHandler myEventConsumer : myWorkHandlerList) {
            workEventProcessorList.add(new MyWorkProcessor(
                    myRingBuffer,
                    myEventConsumer,
                    mySequenceBarrier,
                    this.workSequence));
        }
    }
    /**
     * 返回包括每个workerEventProcessor + workerPool自身的序列号集合
     * */
    public MySequence[] getCurrentWorkerSequences() {
        final MySequence[] sequences = new MySequence[this.workEventProcessorList.size() + 1];
        for (int i = 0, size = workEventProcessorList.size(); i < size; i++) {
            sequences[i] = workEventProcessorList.get(i).getCurrentConsumeSequence();
        }
        sequences[sequences.length - 1] = workSequence;
        return sequences;
    }

图片[1]-MyDisruptorV3版本demo解析v3消费者实现解析功能-唐朝资源网

public MyRingBuffer start(final Executor executor) { final long cursor = myRingBuffer.getCurrentProducerSequence().get(); workSequence.set(cursor); for (MyWorkProcessor processor : workEventProcessorList) { processor.getCurrentConsumeSequence().set(cursor); executor.execute(processor); } return this.myRingBuffer; } } /** * 多线程消费者-事件处理器接口 * */ public interface MyWorkHandler { /** * 消费者消费事件 * @param event 事件对象本身 * */ void consume(T event); }

MyWorkProcessor 实现分析

接下来是本篇博客的关键部分,MyWorkProcessor 的实现。

/**
 * 多线程消费者工作线程 (仿Disruptor.WorkProcessor)
 * */
public class MyWorkProcessor implements Runnable{
    private final MySequence currentConsumeSequence = new MySequence(-1);
    private final MyRingBuffer myRingBuffer;
    private final MyWorkHandler myWorkHandler;
    private final MySequenceBarrier sequenceBarrier;
    private final MySequence workGroupSequence;
    public MyWorkProcessor(MyRingBuffer myRingBuffer,
                           MyWorkHandler myWorkHandler,

                           MySequenceBarrier sequenceBarrier,
                           MySequence workGroupSequence) {
        this.myRingBuffer = myRingBuffer;
        this.myWorkHandler = myWorkHandler;
        this.sequenceBarrier = sequenceBarrier;
        this.workGroupSequence = workGroupSequence;
    }
    public MySequence getCurrentConsumeSequence() {
        return currentConsumeSequence;
    }
    @Override
    public void run() {
        long nextConsumerIndex = this.currentConsumeSequence.get() + 1;
        // 设置哨兵值,保证第一次循环时nextConsumerIndex <= cachedAvailableSequence一定为false,走else分支通过序列屏障获得最大的可用序列号
        long cachedAvailableSequence = Long.MIN_VALUE;
        // 最近是否处理过了序列
        boolean processedSequence = true;
        while (true) {
            try {
                if(processedSequence) {
                    // 争抢到了一个新的待消费序列,但还未实际进行消费(标记为false)
                    processedSequence = false;
                    
                    // 如果已经处理过序列,则重新cas的争抢一个新的待消费序列
                    do {
                        nextConsumerIndex = this.workGroupSequence.get() + 1L;
                        // 由于currentConsumeSequence会被注册到生产者侧,因此需要始终和workGroupSequence worker组的实际sequence保持协调
                        // 即当前worker的消费序列currentConsumeSequence = 当前消费者组的序列workGroupSequence
                        this.currentConsumeSequence.lazySet(nextConsumerIndex - 1L);
                        // 问题:只使用workGroupSequence,每个worker不维护currentConsumeSequence行不行?
                        // 回答:这是不行的。因为和单线程消费者的行为一样,都是具体的消费者eventHandler/workHandler执行过之后才更新消费者的序列号,令其对外部可见(生产者、下游消费者)
                        // 因为消费依赖关系中约定,对于序列i事件只有在上游的消费者消费过后(eventHandler/workHandler执行过),下游才能消费序列i的事件
                        // workGroupSequence主要是用于通过cas协调同一workerPool内消费者线程序列争抢的,对外的约束依然需要workProcessor本地的消费者序列currentConsumeSequence来控制
                        // cas更新,保证每个worker线程都会获取到唯一的一个sequence
                    } while (!workGroupSequence.compareAndSet(nextConsumerIndex - 1L, nextConsumerIndex));
                }else{
                    // processedSequence == false(手头上存在一个还未消费的序列)

图片[2]-MyDisruptorV3版本demo解析v3消费者实现解析功能-唐朝资源网

// 走到这里说明之前拿到了一个新的消费序列,但是由于nextConsumerIndex > cachedAvailableSequence,没有实际执行消费逻辑 // 而是被阻塞后返回获得了最新的cachedAvailableSequence,重新执行一次循环走到了这里 // 需要先把手头上的这个序列给消费掉,才能继续拿下一个消费序列 } // cachedAvailableSequence只会存在两种情况 // 1 第一次循环,初始化为Long.MIN_VALUE,则必定会走到下面的else分支中 // 2 非第一次循环,则cachedAvailableSequence为序列屏障所允许的最大可消费序列 if (nextConsumerIndex <= cachedAvailableSequence) { // 争抢到的消费序列是满足要求的(小于序列屏障值,被序列屏障允许的),则调用消费者进行实际的消费 // 取出可以消费的下标对应的事件,交给eventConsumer消费 T event = myRingBuffer.get(nextConsumerIndex); this.myWorkHandler.consume(event); // 实际调用消费者进行消费了,标记为true.这样一来就可以在下次循环中cas争抢下一个新的消费序列了 processedSequence = true; } else { // 1 第一次循环会获取当前序列屏障的最大可消费序列 // 2 非第一次循环,说明争抢到的序列超过了屏障序列的最大值,等待生产者推进到争抢到的sequence cachedAvailableSequence = sequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex); } } catch (final Throwable ex) { // 消费者消费时发生了异常,也认为是成功消费了,避免阻塞消费序列 // 下次循环会cas争抢一个新的消费序列 processedSequence = true; } } } }

它还实现了 Runnable 接口,并作为一个独立的线程工作,通过主循环持续监控上游进度。消费速度也是由构造函数传入的sequenceBarrier控制的。只用workGroupSequence,每个MyWorkProcessor不会单独维护currentConsumeSequence吧?MyDisruptor v3 版本演示分析

v3 版本支持多线程消费者功能。以下演示展示了如何使用此功能。

public class MyRingBufferV3Demo {
    /**
     *              -> 多线程消费者B(依赖A)
     * 单线程消费者A                       -> 单线程消费者D(依赖B、C)
     *              -> 单线程消费者C(依赖A)
     * */
    public static void main(String[] args) throws InterruptedException {

        // 环形队列容量为16(2的4次方)
        int ringBufferSize = 16;
        // 创建环形队列
        MyRingBuffer myRingBuffer = MyRingBuffer.createSingleProducer(
                new OrderEventProducer(), ringBufferSize, new MyBlockingWaitStrategy());
        // 获得ringBuffer的序列屏障(最上游的序列屏障内只维护生产者的序列)
        MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier();
        // ================================== 基于生产者序列屏障,创建消费者A
        MyBatchEventProcessor eventProcessorA =
                new MyBatchEventProcessor(myRingBuffer, new OrderEventHandlerDemo("consumerA"), mySequenceBarrier);
        MySequence consumeSequenceA = eventProcessorA.getCurrentConsumeSequence();
        // RingBuffer监听消费者A的序列
        myRingBuffer.addGatingConsumerSequenceList(consumeSequenceA);
        // ================================== 消费者组依赖上游的消费者A,通过消费者A的序列号创建序列屏障(构成消费的顺序依赖)
        MySequenceBarrier workerSequenceBarrier = myRingBuffer.newBarrier(consumeSequenceA);
        // 基于序列屏障,创建多线程消费者B
        MyWorkerPool workerPoolProcessorB =
                new MyWorkerPool(myRingBuffer, workerSequenceBarrier,
                        new OrderWorkHandlerDemo("workerHandler1"),
                        new OrderWorkHandlerDemo("workerHandler2"),
                        new OrderWorkHandlerDemo("workerHandler3"));
        MySequence[] workerSequences = workerPoolProcessorB.getCurrentWorkerSequences();
        // RingBuffer监听消费者C的序列
        myRingBuffer.addGatingConsumerSequenceList(workerSequences);
        // ================================== 通过消费者A的序列号创建序列屏障(构成消费的顺序依赖),创建消费者C
        MySequenceBarrier mySequenceBarrierC = myRingBuffer.newBarrier(consumeSequenceA);
        MyBatchEventProcessor eventProcessorC =
                new MyBatchEventProcessor(myRingBuffer, new OrderEventHandlerDemo("consumerC"), mySequenceBarrierC);
        MySequence consumeSequenceC = eventProcessorC.getCurrentConsumeSequence();
        // RingBuffer监听消费者C的序列
        myRingBuffer.addGatingConsumerSequenceList(consumeSequenceC);
        // ================================== 基于多线程消费者B,单线程消费者C的序列屏障,创建消费者D
        MySequence[] bAndCSequenceArr = new MySequence[workerSequences.length+1];
        // 把多线程消费者B的序列复制到合并的序列数组中
        System.arraycopy(workerSequences, 0, bAndCSequenceArr, 0, workerSequences.length);

        // 数组的最后一位是消费者C的序列
        bAndCSequenceArr[bAndCSequenceArr.length-1] = consumeSequenceC;
        MySequenceBarrier mySequenceBarrierD = myRingBuffer.newBarrier(bAndCSequenceArr);
        MyBatchEventProcessor eventProcessorD =
                new MyBatchEventProcessor(myRingBuffer, new OrderEventHandlerDemo("consumerD"), mySequenceBarrierD);
        MySequence consumeSequenceD = eventProcessorD.getCurrentConsumeSequence();
        // RingBuffer监听消费者D的序列
        myRingBuffer.addGatingConsumerSequenceList(consumeSequenceD);
        // 启动消费者线程A
        new Thread(eventProcessorA).start();
        // 启动workerPool多线程消费者B
        workerPoolProcessorB.start(Executors.newFixedThreadPool(100, new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"worker" + mCount.getAndIncrement());
            }
        }));
        // 启动消费者线程C
        new Thread(eventProcessorC).start();
        // 启动消费者线程D
        new Thread(eventProcessorD).start();
        // 生产者发布100个事件
        for(int i=0; i<100; i++) {
            long nextIndex = myRingBuffer.next();
            OrderEventModel orderEvent = myRingBuffer.get(nextIndex);
            orderEvent.setMessage("message-"+i);
            orderEvent.setPrice(i * 10);
            System.out.println("生产者发布事件:" + orderEvent);
            myRingBuffer.publish(nextIndex);
        }
    }
}

总结

Disruptor无论在整体设计还是最终代码实现上都有很多值得深思和学习的细节。希望对对disruptor感兴趣的小伙伴有所帮助。

这个博客的完整代码在我的 github 上:branch: feature/lab3

© 版权声明
THE END
喜欢就支持一下吧
点赞175 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片