Ste*_*doo 2 c# asynchronous disruptor-pattern
我在 C# 应用程序中使用Disruptor-net 。我在理解如何在干扰器模式中执行异步操作时遇到一些困难。
假设我有一些事件处理程序,并且链中的最后一个将消息传递给我的业务逻辑处理器,那么我如何处理业务逻辑处理器内部的异步操作?当我的业务逻辑需要进行一些数据库插入时,它是否会将消息传递给我的输出中断器,由它进行插入,然后在我的输入中断器上发布一条新消息,其中包含所有状态以继续事务?
此外,在我的输出干扰器中,我会使用任务吗?我 99.9% 确定我想要使用任务,这样我就不会遇到大量阻塞异步操作的事件处理程序。那么,这与颠覆者模式有何契合呢?在我的 EventHandler 中做这样的事情似乎有点奇怪。
void OnEvent(MyEvent evt, long sequence, bool endOfBatch)
{
db.InsertAsync(evt).ContinueWith(task => inputDisruptor.Publish(task));
}
Run Code Online (Sandbox Code Playgroud)
干扰器具有以下特点:
Your code sample does not really follow the Disruptor philosophy:
Task.ContinueWith runs asynchronously by default, so the continuation will use thread-pool threads.TaskContinuationOptions.ExecuteSynchronously, you have no guarantee that InsertAsync will invoke the continuations in-order.I will put aside the fact that your code is generating heap allocations. You will not benefit from the "no GC pauses" effect but it is probably very acceptable for your use-case.
Also, please note that batching is crucial to support high-throughput for IO operations. You should really use the Disruptor batches in your event handler.
I will simplify the problem to 3 event handlers:
当然,我假设插入后逻辑必须仅在插入完成后运行。
如果您的目标是将事件保存在 中InsertEventHandler并在下一个处理程序中处理事件之前阻塞直到完成,那么您可能应该在 中等待InsertEventHandler。
InsertEventHandler:
void OnEvent(MyEvent evt, long sequence, bool endOfBatch)
{
_pendingInserts.Add((evt, task: db.InsertAsync(evt)));
if (endOfBatch)
{
var insertSucceeded = Task.WaitAll(_pendingInserts.Select(x => x.task).ToArray(), _insertTimeout);
foreach (var (pendingEvent, _) in _pendingInserts)
{
pendingEvent.InsertSucceeded = insertSucceeded;
}
_pendingInserts.Clear();
}
}
Run Code Online (Sandbox Code Playgroud)
当然,如果您的数据库 API 公开了批量插入方法,那么最好将事件添加到列表中并在批处理结束时将它们全部保存。
还有许多其他选项,例如等待PostInsertEventHandler,或在另一个 Disruptor 中对插入结果进行排队,每个选项都有自己的优点和缺点。SO 答案可能不是讨论和分析所有这些问题的最佳场所。
| 归档时间: |
|
| 查看次数: |
482 次 |
| 最近记录: |