LMAX的破坏模式如何运作?

Sha*_*baz 202 concurrency latency actor disruptor-pattern

我试图理解破坏者的模式.我看过InfoQ视频,试图阅读他们的论文.我知道有一个环形缓冲区,它被初始化为一个非常大的数组,以利用缓存局部性,消除新内存的分配.

听起来有一个或多个原子整数可以跟踪位置.每个"事件"似乎都得到一个唯一的id,它在环中的位置是通过找到与模数相关的模数等来找到的.

不幸的是,我没有直观的感觉它是如何工作的.我做了很多交易应用程序并研究了演员模型,看过SEDA等.

在他们的演讲中他们提到这种模式基本上是路由器的工作方式; 但是我还没有找到路由器如何工作的任何好的描述.

是否有一些更好解释的好指示?

Mic*_*ker 209

Google Code项目确实引用了关于环形缓冲区实现的技术论文,但对于想要了解其工作原理的人来说,它有点干,学术性和难度.然而,有一些博客文章已经开始以更易读的方式解释内部.有一个关于环形缓冲区解释,它是破坏者模式的核心,描述了消费者障碍(与从破坏者中读取相关的部分)和一些关于处理多个生产者的信息.

Disruptor最简单的描述是:它是一种以尽可能最有效的方式在线程之间发送消息的方法.它可以用作队列的替代品,但它也与SEDA和Actors共享许多功能.

与队列相比:

Disruptor提供了将消息传递到另一个线程的能力,如果需要可以将其唤醒(类似于BlockingQueue).但是,有3个明显的差异.

  1. Disruptor的用户通过扩展Entry类并提供工厂来进行预分配来定义消息的存储方式.这允许内存重用(复制)或Entry可以包含对另一个对象的引用.
  2. 将消息放入Disruptor是一个两阶段的过程,首先在环形缓冲区中声明一个槽,它为用户提供可以用适当的数据填充的Entry.然后必须提交该条目,这种两阶段方法对于允许灵活使用上述存储器是必要的.提交使消息对消费者线程可见.
  3. 消费者有责任跟踪从环形缓冲区消耗的消息.将此责任从环形缓冲区本身移开有助于减少写入争用的数量,因为每个线程都维护自己的计数器.

与演员相比

与大多数其他编程模型相比,Actor模型更接近Disruptor,特别是如果您使用提供的BatchConsumer/BatchHandler类.这些类隐藏了维护消耗的序列号的所有复杂性,并在发生重要事件时提供一组简单的回调.但是,有一些细微的差别.

  1. Disruptor使用1个线程 - 1个消费者模型,其中Actors使用N:M模型,即您可以拥有任意数量的角色,并且它们将分布在固定数量的线程中(通常每个核心1个).
  2. BatchHandler接口提供了一个额外的(非常重要的)回调onEndOfBatch().这允许缓慢的消费者,例如那些一起进行批处理事件的I/O以提高吞吐量.可以在其他Actor框架中进行批处理,但由于几乎所有其他框架都不在批处理结束时提供回调,因此需要使用超时来确定批处理的结束,从而导致延迟较差.

与SEDA相比

LMAX构建了Disruptor模式来取代基于SEDA的方法.

  1. 它为SEDA提供的主要改进是能够并行工作.为此,Disruptor支持向多个消费者多次转发相同的消息(以相同的顺序).这避免了管道中对叉级的需要.
  2. 我们还允许消费者等待其他消费者的结果,而不必在他们之间放置另一个排队阶段.消费者可以简单地观察它所依赖的消费者的序列号.这避免了管道中连接阶段的需要.

与记忆障碍相比

另一种思考方式是作为一种结构化的有序内存屏障.生产者屏障形成写屏障,消费者屏障是读屏障.


irr*_*ble 135

首先,我们想了解它提供的编程模型.

有一个或多个作家.有一个或多个读者.有一系列条目,完全从旧到新排序(如左图所示).作家可以在右端添加新条目.每个读者从左到右依次读取条目.显然,读者无法阅读过去的作家.

没有条目删除的概念.我使用"阅读器"而不是"消费者"来避免使用条目的图像.但是我们知道最后一个读者左边的条目变得毫无用处.

一般来说,读者可以同时独立阅读.但是我们可以在读者之间声明依赖关系.读者依赖可以是任意非循环图.如果读卡器B依赖于读卡器A,则读卡器B无法读取读卡器A.

读者依赖性产生是因为读者A可以注释一个条目,而读者B依赖于该注释.例如,A对条目进行一些计算,并将结果存储a在条目的字段中.然后继续前进,现在B可以读取条目,并a存储A 的值.如果读者C不依赖于A,则C不应该尝试阅读a.

这确实是一个有趣的编程模型.无论性能如何,单独的模型都可以使许多应用程序受益.

当然,LMAX的主要目标是性能.它使用预先分配的条目环.环足够大,但是它的界限使得系统不会超出设计容量.如果戒指已满,作家将等到最慢的读者前进并腾出空间.

入口对象是预先分配并永久存在的,以减少垃圾收集成本.我们不插入新的条目对象或删除旧的条目对象,而是写入者要求预先存在的条目,填充其字段并通知读者.这种明显的两阶段行动实际上只是一种原子行为

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }
Run Code Online (Sandbox Code Playgroud)

预分配条目还意味着相邻条目(很可能)位于相邻的存储器单元中,并且因为读取器顺序读取条目,所以这对于利用CPU高速缓存很重要.

并且努力避免锁定,CAS,甚至内存屏障(例如,如果只有一个编写器,则使用非易失性序列变量)

对于读者的开发人员:不同的注释读者应该写入不同的字段,以避免写入争用.(实际上他们应该写入不同的缓存行.)注释读者不应该触及其他非依赖读者可能阅读的内容.这就是我说这些读者注释条目而不是修改条目的原因.

  • +1这是唯一一个试图描述破坏者模式如何实际运作的答案,正如OP所说的那样. (21认同)
  • 对我来说还好。我喜欢使用“注释”一词。 (2认同)

Chu*_*ucK 41

Martin Fowler撰写了一篇关于LMAX和破坏者模式LMAX架构的文章,该文章可能会进一步阐明.


小智 17

我实际上是花时间研究实际的来源,出于纯粹的好奇心,其背后的想法非常简单.撰写本文时的最新版本是3.2.1.

存在一个缓冲区,用于存储预先分配的事件,这些事件将保存数据供消费者阅读.

缓冲区由其长度的标志数组(整数数组)支持,该数组描述缓冲区插槽的可用性(有关详细信息,请参阅更多信息).该数组的访问方式类似于java#AtomicIntegerArray,因此为了进行此扩展,您可以将其视为一个.

可以有任意数量的生产者.当生产者想要写入缓冲区时,会生成一个长数字(如在调用AtomicLong#getAndIncrement时,Disruptor实际上使用它自己的实现,但它以相同的方式工作).让我们称这个生成的long为producerCallId.以类似的方式,当消费者ENDS从缓冲区读取时隙时,生成consumerCallId.访问最新的consumerCallId.

(如果有很多消费者,则选择ID最低的呼叫.)

然后比较这些id,如果两者之间的差异小于缓冲器侧,则允许生产者写入.

(如果producerCallId大于最近的consumerCallId + bufferSize,则意味着缓冲区已满,并且生产者被迫进行总线等待,直到某个点变为可用.)

然后根据他的callId(即prducerCallId modulo bufferSize)为生产者分配缓冲区中的槽,但由于bufferSize总是2的幂(在缓冲区创建时强制执行限制),因此使用的执行操作是producerCallId&(bufferSize - 1) )).然后可以自由修改该槽中的事件.

(实际的算法有点复杂,包括将最近的consumerId缓存在单独的原子引用中,以进行优化.)

修改事件后,更改将"已发布".当发布标志数组中的相应插槽时,将填充更新的标志.标志值是循环的数量(producerCallId除以bufferSize(同样,因为bufferSize是2的幂,实际操作是右移).

以类似的方式,可以有任何数量的消费者.每次消费者想要访问缓冲区时,都会生成consumerCallId(取决于将消费者添加到破坏者的方式,id生成中使用的原子可以为每个用户共享或分离).然后将此consumerCallId与最近的producentCallId进行比较,如果它是两者中的较小者,则允许读者进步.

(类似地,如果producerCallId甚至是consumerCallId,则意味着缓冲区是安全的,并且消费者被迫等待.等待的方式由在创建破坏者期间的WaitStrategy定义.)

对于个人消费者(具有他们自己的id生成器的消费者),接下来检查的是批量消费的能力.缓冲区中的槽按顺序从对应的customerCallId(索引以与生产者相同的方式确定)的顺序检查到与最近的producerCallId相对应的槽.

通过比较标志数组中写入的标志值与为consumerCallId生成的标志值,在循环中检查它们.如果标志匹配,则意味着填充插槽的生产者已经提交了他们的更改.如果不是,则循环中断,并返回最高的提交changeId.从ConsumerCallId到changeId中接收的插槽可以批量使用.

如果一组消费者一起阅读(具有共享id生成器的消费者),则每个消费者只接受一个callId,并且仅检查并返回该单个callId的插槽.


rda*_*ida 7

这篇文章:

破坏者模式是由圆形阵列(即环形缓冲区)备份的批处理队列,其填充有预先分配的传输对象,其使用存储器障碍来通过序列来同步生产者和消费者.

内存障碍有点难以解释,Trisha的博客在我看来做了最好的尝试:http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast. HTML

但是如果你不想深入了解低级细节,你可以知道Java中的内存障碍是通过volatile关键字或通过关键字实现的java.util.concurrent.AtomicLong.破坏者模式序列是AtomicLongs并且通过记忆障碍而不是锁来在生产者和消费者之间来回传递.

我发现通过代码理解概念更容易,因此下面的代码是来自CoralQueue的一个简单的helloworld,它是由我所属的CoralBlocks完成的破坏程序模式实现.在下面的代码中,您可以看到破坏程序模式如何实现批处理以及环形缓冲区(即循环数组)如何允许两个线程之间的无垃圾通信:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}
Run Code Online (Sandbox Code Playgroud)