我知道有些人正在使用C++端口.
C#端口可能吗?特别是我在考虑C#中仅有32位的易失性字段的限制.如果这是唯一的问题,那么有什么想认为将该环形缓冲区编写为只允许32位允许的时隙数是值得的.这在系统的生命周期中发生的事件要少得多.我有这个权利吗?我们可以在某个时候回到0吗?
先感谢您.
破坏者实际上是应用程序而不是JMS的替代品吗?我目前使用JMS消息传输任务.
破坏者是否打算将其替换为邮件传输?每个人的利弊是什么?
目前我使用JMS,我有一个生产者将消息发送到队列和消费者(MDB)将它们从队列中拉出来.
谢谢.
如何监控LMAX Disruptor?假设我有 3 个环形缓冲区,并希望提供一个 ui 来为我提供环形缓冲区的信息。
我正在调查LMAX Disruptor的源代码,我进入了RingBuffer
抽象类.为什么正好有7个长场(p1 ... p7)RingBufferPad
?这是实际代码:https:
//github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/RingBuffer.java
abstract class RingBufferPad
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad
{
....
Run Code Online (Sandbox Code Playgroud) HY,
目前我正在开发一个程序,它从amq队列获取2个值并对它们执行一系列数学计算.已在我的程序订阅的amq服务器上创建了一个主题,并通过回调(侦听器)接收消息.
现在,只要消息到达,就会从SynchronizedDescriptiveStatistics对象中取出并添加两个值.在每次添加到值列表之后,重新执行整个计算序列(这实际上是需求的一部分).
我现在面临的问题是,由于我使用的是侦听器,因此在计算过程中有时会收到一条或多条消息.尽管SynchronizedDescriptiveStatistics会自行处理所有与线程相关的问题,但它会在锁定或其他内容时立即在其数字列表中添加所有等待值.虽然我的问题是添加一个值,然后对其执行calcls然后再执行第二个值.
我想出的解决方案是在我的程序中使用作业队列(而不是amq队列).通过这种方式,只要计算结束,程序就会在队列中寻找更多的工作并继续相应的工作.
由于我也在寻找效率和速度,我认为Disruptor框架可能对这个问题有好处,并且它针对线程情况进行了优化.但我不确定它是否值得在我的应用程序中实现Disruptor的麻烦,因为常规标准队列可能足以满足我的目的.
我还要告诉你,需要执行计算的数据很多,并且它将继续进行,并且需要在连续方式中每次添加单个值时重复执行整个计算.因此,请记住效率和大量数据,从长远来看,您认为哪些数据有用.
等待回复...
问候.
请考虑Martin Fowler的 LMAX Architecture 描述中的以下场景:
我将使用一个简单的非LMAX示例来说明.想象一下,您正在通过信用卡订购果冻豆.<...>
在LMAX架构中,您可以将此操作拆分为两个.第一个操作将捕获订单信息并通过向信用卡公司输出事件(请求的信用卡验证)来完成.然后,业务逻辑处理器将继续为其他客户处理事件,直到它在其输入事件流中收到信用卡验证事件.在处理该事件时,它将执行该订单的确认任务.
因此,订单保留在内存中,直到收到付款处理结果.
现在让我们假设代替信用卡处理步骤,我们需要花费更多时间的步骤,例如:我们需要执行库存检查,有人必须在物理上验证我们是否已经订购了特定的果冻豆味道.这可能需要一个小时.
如果是这种情况,不会导致内存中保存的数据增长,因为很多订单可能会等待库存状态更新事件?
可能在这种情况下,我们需要从内存中删除订单并将其作为输出事件的一部分包含在内,外部系统(库存)负责生成包含订单详细信息的另一个输入事件.
我用这种方法看到的问题是,我们不能将库存作为业务逻辑处理器的一部分.
关于我们如何解决这个问题的想法?
在LMAX Disruptor模式中,复制器用于将输入事件从主节点复制到从节点.所以设置可能如下所示:
主节点的复制器将事件写入数据库(尽管我们可以考虑比写入数据库更好的机制 - 但它对问题语句并不重要).从节点的接收器从DB读取并将事件放入从节点的环形缓冲器.
从节点的输出事件被忽略.
现在,主节点的业务逻辑处理器可能比从节点的业务逻辑处理器慢.例如,主节点的BL可以在时隙102处,其中从节点可以在106处.(这可以发生,因为复制器在业务逻辑处理器之前从环形缓冲器读取事件).
在这种情况下,如果主节点发生故障并且从节点现在成为主节点,则外部系统可能会遗漏一些关键事件.这可能发生,因为节点2在充当从节点时会忽略其输出.
Martin Fowler确实说复制器的工作是保持节点同步:"之前我提到LMAX在集群中运行其系统的多个副本以支持快速故障转移.复制器使这些节点保持同步"
但我不确定它如何使Business Logic Processor保持同步?有任何想法吗?
我计划在我的破坏者中有许多并行消费者。
我需要每个消费者只消费为他们准备的消息。
例如,我有 A、B、C 类型的消息,我有类似的缓冲区
#1 - type A, #2 - type B, #3 - type C, #4 - type A, #5 - type C, #6 - type C, (and so on)
Run Code Online (Sandbox Code Playgroud)
我有每种类型的消费者。我怎样才能让 A 的消费者接受消息 1 和 4,对于类型 B - 消息 2,C - 消息 3、5、6?
重要提示:我希望处理是独立的。消费者不应该被链接起来,每个人都独立地在缓冲区中移动。如果 A 的消费者比 C 的消费者慢,则“类型 C”消费者对 #6 的处理可能会早于类型 A 的 #1 参与。
我很欣赏如何使用 LMAX 干扰器配置来做到这一点的解释。
遵循Disruptor入门指南,我建立了一个由单个生产者和单个消费者组成的最小破坏者。
制片人
import com.lmax.disruptor.RingBuffer;
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData()
{
long sequence = ringBuffer.next();
try
{
LongEvent event = ringBuffer.get(sequence);
}
finally
{
ringBuffer.publish(sequence);
}
}
}
Run Code Online (Sandbox Code Playgroud)
消费者(注意消费者什么也不做onEvent
)
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{}
}
Run Code Online (Sandbox Code Playgroud)
我的目标是对大型环缓冲区进行一次性能测试,而不是多次遍历较小的环。在每种情况下,总操作数(bufferSize
X rotations
)都是相同的。我发现随着环形缓冲区变小,操作/秒速率急剧下降。
RingBuffer Size | Revolutions | Total Ops | Mops/sec
1048576 …
Run Code Online (Sandbox Code Playgroud) Disruptor github地址为:https://github.com/LMAX-Exchange/disruptor
我对其进行了一个简单的测试,如下所示:
public class DisruptorMain {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws Exception {
class Element {
private int value;
public int get() {
return value;
}
public void set(int value) {
this.value = value;
}
}
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
EventHandler<Element> handler = …
Run Code Online (Sandbox Code Playgroud)