为什么我的 Disruptor 程序没有充分利用环形缓冲区

ges*_*nri 2 java producer-consumer disruptor-pattern

Disruptor github地址为:https://github.com/LMAX-Exchange/disruptor

我对其进行了一个简单的测试,如下所示:

public class DisruptorMain {
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void main(String[] args) throws Exception {
        class Element {

            private int value;

            public int get() {
                return value;
            }

            public void set(int value) {
                this.value = value;
            }

        }

        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) {
                try {
                    Thread.sleep(1000 * sequence);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("Element: " + element.get());
            }
        };

        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        int bufferSize = 4;

        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        disruptor.handleEventsWith(handler);

        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; l < 8; l++) {
            long sequence = ringBuffer.next();
            System.out.println("sequence:" + sequence);

            try {
                Element event = ringBuffer.get(sequence);
                event.set(l);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

结果是: 序列:0 序列:1 序列:2 序列:3 元素:0 元素:1 元素:2 元素:3 序列:4 序列:5 序列:6 序列:7 元素:4 元素:5 元素:6 元素: 7

在我的测试中,我定义了一个环形缓冲区大小为4,并且我有一个生产者为其创建8个任务,我的问题是,当生产者将4个任务放入环形缓冲区时,消费者开始从环形缓冲区中获取任务工作起来,任务1完成后,环形缓冲区应该有一个空的空间来容纳任务5,但结果显示,只有当环形缓冲区中的所有任务都完成后,环形缓冲区才能接受新任务,为什么?

Mic*_*ker 7

这是因为 Disruptor 将批处理事件处理程序。如果事件处理程序很慢或环形缓冲区很小,则批处理大小通常可以是环形缓冲区的大小。中断器只会更新该事件处理程序的处理序列,直到批处理完成。这减少了发布者用来确定空间是否可用而需要对序列变量进行的更新次数。如果您需要早于默认空间提供可用空间,则可以使用 SequenceReportingEventHandler 来实现。

public class MyEventHandler implements SequenceReportingEventHandler<Element> {
    Sequence processedSequence;

    public void setSequenceCallback(Sequence s) {
        processedSequence = s;
    }

    public void onEvent(Element e, long sequence, boolean endOfBatch) {
        // Do stuff
        processedSequence.set(sequence);
    }
}
Run Code Online (Sandbox Code Playgroud)