如何使用disruptor模式实现解复用器?

Joh*_*ine 0 java queue real-time message-queue disruptor-pattern

我希望有一个环形缓冲区队列,它将接收对象并将它们从一个线程池中的多个线程分布到一个生产者中,然后分配给多个消费者时尚.如何使用disruptor模式实现这一目标?任何HelloDemux代码示例?谢谢!!!

rda*_*ida 6

文章详细介绍有关实现破坏者模式解复用器的一切,但我觉得一个线程池意味着你将需要一个调度员,这违背了破坏者的模式.要实现demux,您需要设置固定数量的使用者线程,而不是池,并让它们从队列尾部获取消息.现在,您可能会问,如果没有调度员,他们怎么能这样做?他们只是在队列尾巴周围忙着旋转(或使用其他类型的等待策略,包括旋转,屈服,停车,睡觉等组合).现在,您可能会问,他们如何在不相互踩踏的情况下做到这一点?然后你有两个选择:你可以使用MODULUS(无锁)或CAS(轻锁).每个人都有自己的优点和缺点.MODULUS很快,但如果一个消费者落后,可能会引起车道争用.CAS不是那么快但不会引起车道争用.

package com.coralblocks.coralqueue.sample.demux;

import com.coralblocks.coralqueue.demux.CASAtomicDemux;
import com.coralblocks.coralqueue.demux.Demux;

public class Sample {

    private static final int NUMBER_OF_CONSUMERS = 4;

    public static void main(String[] args) throws InterruptedException {

        final Demux<StringBuilder> queue = new CASAtomicDemux<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);

        Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];

        for(int i = 0; i < consumers.length; i++) {

            final int index = i;

            consumers[i] = new Thread() {

                @Override
                public void run() {

                    boolean running = true;

                    while(running) {
                        long avail;
                        while((avail = queue.availableToPoll(index)) == 0); // busy spin
                        for(int i = 0; i < avail; i++) {
                            StringBuilder sb = queue.poll(index);

                            if (sb == null) break; // mandatory for demuxes!

                            if (sb.length() == 0) {
                                running = false;
                                break; // exit immediately...
                            } else {
                                System.out.println(sb.toString());
                            }
                        }
                        queue.donePolling(index);
                    }
                }
            };

            consumers[i].start();
        }

        StringBuilder sb;

        for(int i = 0; i < 10; i++) {
            while((sb = queue.nextToDispatch()) == null); // busy spin
            sb.setLength(0);
            sb.append("message ").append(i);
            queue.flush();
        }

        // send a message to stop consumers...
        for(int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
            // because the consumer exit immediately on this message, each
            // consumer will get one of these messages and exit...
            while((sb = queue.nextToDispatch()) == null); // busy spin
            sb.setLength(0);
        }
        queue.flush(); // sent batch

        for(int i = 0; i < consumers.length; i++) consumers[i].join();
    }
}
Run Code Online (Sandbox Code Playgroud)