Coup de Grace

内存队列disruptor源码解读 抽象与用法

起因是一位同事选型时坚称 log4j2 秒了,强无敌…

于是我翻了翻 apache 发现他们性能测试时的文字游戏

另外就是 Logback 在 appender 丰富度和Metrics 输出能力上还是很不错的, Pivotal 那帮人改方案之前我觉得完全没必要.

真实系统中的瓶颈几乎永远不会是线程间数据交换,不过Disruptor的 paper 真是令人读的神清气爽.

但是Disruptor好东西得读一下.


从何说起

我们这次从Usage 跟它宣称解决的问题来入手.

实际上这几个点都被 log4j2拿去当有点宣传了.

Usage:

Disruptor(
        //RingBuffer上初始化占位 instance 的产生器
        final EventFactory<T> eventFactory, 
        //环形数组大小,取2的 n 次方来避免取余
        final int ringBufferSize, 
        //从 RingBuffer 上获取任务的线程池
        final ThreadFactory threadFactory, 
        //生产策略    
        final ProducerType producerType, 
        //消费策略
        final WaitStrategy waitStrategy) 

而后

//指定消费者/事件处理器
Disruptor#handleEventsWith(EventHandler<? super T>...)
Disruptor#handleEventsWith(EventProcessorFactory<T>...)
//这个 then 分组是怎么实现的得看看
Disruptor#handleEventsWith(EventHandler<? super T>...)then(EventHandler<? super T>...)

//事件处理器开始绑定到消费线程上,并返回 RingBuffer 提供 slot 塞事件 
Disruptor#start

启动过程跟 usage 到此为止.

而现在我们给出核心抽象:

本部分内容来自这里

public static void main(String[] args) throws Exception {
        // The ThreadFactory for create producer thread.
        ThreadFactory producerFactory = new ProducerFactory();
        // The factory for the event
        LongEventFactory eventFactory = new LongEventFactory();
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 8;

        // Construct the Disruptor,创建Disruptor组件
        Disruptor<LongEvent> disruptor = new Disruptor<>(
                eventFactory, 
                bufferSize, 
                producerFactory, 
                ProducerType.SINGLE, 
                new BlockingWaitStrategy()
            );

        // Connect the handler,绑定消费者事件,可以是多个
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.handleEventsWith(new LogEventHandler());

        // Start the Disruptor, starts all threads running
        //启动Disruptor,启动所有线程,主要是消费者对应的EventProcessor侦听线程
        //消费者事件处理器开始侦听RingBuffer中的消息
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++){
            bb.putLong(0, l);
            //生产者向RingBuffer中写入消息
            producer.onData(bb);
            Thread.sleep(10);
        }
    }

done.