Chr*_*sgh 35 .net c# azure azure-eventhub
该应用程序使用.NET 4.6.1和Microsoft.Azure.ServiceBus.EventProcessorHost nuget软件包v2.0.2及其依赖WindowsAzure.ServiceBus软件包v3.0.1来处理Azure事件中心消息.
该应用程序有一个实现IEventProcessor.当从ProcessEventsAsync方法抛出未处理的异常时,EventProcessorHost永远不会将这些消息重新发送到正在运行的实例IEventProcessor.(有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送.)
有没有办法强制将导致异常的事件消息重新发送EventProcessorHost给IEventProcessor实现?
本评论中提供了一个可能的解决方案,该问题几乎完全相同: IEventProcessor.ProcessEventsAsync中的Redeliver未处理的EventHub消息
该评论建议保留最后一个成功处理的事件消息的副本,并在发生异常时显式使用该消息进行检查点ProcessEventsAsync.但是,在实施和测试这样的解决方案后,EventProcessorHost仍然不会重新发送.实现非常简单:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
Run Code Online (Sandbox Code Playgroud)
此处提供部分日志示例:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
Sre*_*ati 14
TLDR:唯一可靠的方式重新上场失败的批次事件到的IEventProcessor.ProcessEventsAsync是- Shutdown在EventProcessorHost(又名EPH)立即 -无论是使用eph.UnregisterEventProcessorAsync()或终止进程 -基于的情况.这将允许其他EPH实例获取此分区的租约并从上一个检查点开始.
在解释之前 - 我想说出来,这是一个很好的问题,实际上,这是我们必须做出的最艰难的设计选择之一EPH.在我看来,这是一个折中的B/W:usability/ supportability中的EPH框架,过Technical-Correctness.
理想情况应该是:当用户代码IEventProcessorImpl.ProcessEventsAsync抛出异常时 - EPH库不应该捕获这个.应该让这个Exception- 崩溃过程和crash-dump清楚地显示callstack负责任.我仍然相信 - 这是最好的technically-correct解决方案.
现状:IEventProcessorImpl.ProcessEventsAsyncAPI 的合同EPH是,
EventData可以从EventHubs服务接收- 继续IEventProcessorImplementation.ProcessEventsAsync使用EventData's&调用user-callback(),如果user-callback在调用时抛出错误,则通知EventProcessorOptions.ExceptionReceived.IEventProcessorImpl.ProcessEventsAsync应处理所有错误并Retry's根据需要合并.EPH不会在此回调上设置任何超时,以便用户完全控制处理时间.EventData具有特殊属性-为前:类型= poison-event并重新发送到同一个EventHub(包含一个指向实际的事件,复制这些EventData.Offset并SequenceNumber进入新的EventData.ApplicationProperties),或者它FWD到SERVICEBUS将其排队或存储在其他地方,基本上,识别并推迟处理毒物事件.Exceptions- catch'em&shutdown EPH或failfast具有此异常的进程.当EPH它恢复时 - 它将从它左边开始.
为什么检查点"老事件"不起作用(读这要了解EPH一般):
在幕后,EPH每个EventHub Consumergroup分区的接收器运行一个泵 - 其工作是从给定的checkpoint(如果存在)启动接收器并创建一个专用的IEventProcessor实现实例,然后receive从Offset检查点中指定的指定EventHub分区(如果不存在 - EventProcessorOptions.initialOffsetProvider)并最终调用IEventProcessorImpl.ProcessEventsAsync.Checkpoint当EPH进程Shutsdown和Partition的所有权被移动到另一个EPH实例时,其目的是能够可靠地开始处理消息.因此,checkpoint只有在启动泵时才会消耗,并且一旦泵启动就不会被读取.
正如我写的那样,EPH版本是2.2.10 ......
| 归档时间: |
|
| 查看次数: |
1721 次 |
| 最近记录: |