强制EventProcessorHost将失败的Azure Event Hub eventData重新传递给IEventProcessor.ProcessEvents方法

Chr*_*sgh 35 .net c# azure azure-eventhub

该应用程序使用.NET 4.6.1和Microsoft.Azure.ServiceBus.EventProcessorHost nuget软件包v2.0.2及其依赖WindowsAzure.ServiceBus软件包v3.0.1来处理Azure事件中心消息.

该应用程序有一个实现IEventProcessor.当从ProcessEventsAsync方法抛出未处理的异常时,EventProcessorHost永远不会将这些消息重新发送到正在运行的实例IEventProcessor.(有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送.)

有没有办法强制将导致异常的事件消息重新发送EventProcessorHostIEventProcessor实现?

本评论中提供了一个可能的解决方案,该问题几乎完全相同: IEventProcessor.ProcessEventsAsync中的Redeliver未处理的EventHub消息

该评论建议保留最后一个成功处理的事件消息的副本,并在发生异常时显式使用该消息进行检查点ProcessEventsAsync.但是,在实施和测试这样的解决方案后,EventProcessorHost仍然不会重新发送.实现非常简单:

private EventData _lastSuccessfulEvent;

public async Task ProcessEventsAsync(
    PartitionContext context,
    IEnumerable<EventData> messages)
{
    try
    {
        await ProcessEvents(context, messages);     // does actual processing, may throw exception
        _lastSuccessfulEvent = messages
            .OrderByDescending(ed => ed.SequenceNumber)
            .First();
    }
    catch(Exception ex)
    {
        await context.CheckpointAsync(_lastSuccessfulEvent);
    }
}
Run Code Online (Sandbox Code Playgroud)

对行动中的事情的分析: 在此输入图像描述

此处提供部分日志示例:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

Sre*_*ati 14

TLDR:唯一可靠的方式重新上场失败的批次事件到的IEventProcessor.ProcessEventsAsync是- ShutdownEventProcessorHost(又名EPH)立即 -无论是使用eph.UnregisterEventProcessorAsync()终止进程 -基于的情况.这将允许其他EPH实例获取此分区的租约并从上一个检查点开始.

在解释之前 - 我想说出来,这是一个很好的问题,实际上,这是我们必须做出的最艰难的设计选择之一EPH.在我看来,这是一个折中的B/W:usability/ supportability中的EPH框架,过Technical-Correctness.

理想情况应该是:当用户代码IEventProcessorImpl.ProcessEventsAsync抛出异常时 - EPH库不应该捕获这个.应该让这个Exception- 崩溃过程和crash-dump清楚地显示callstack负责任.我仍然相信 - 这是最好的technically-correct解决方案.

现状:IEventProcessorImpl.ProcessEventsAsyncAPI 的合同EPH是,

  1. 只要EventData可以从EventHubs服务接收- 继续IEventProcessorImplementation.ProcessEventsAsync使用EventData's&调用user-callback(),如果user-callback在调用时抛出错误,则通知EventProcessorOptions.ExceptionReceived.
  2. 内部的用户代码IEventProcessorImpl.ProcessEventsAsync应处理所有错误并Retry's根据需要合并.EPH不会在此回调上设置任何超时,以便用户完全控制处理时间.
  3. 如果一个特定的事件是故障原因-纪念EventData具有特殊属性-为前:类型= poison-event并重新发送到同一个EventHub(包含一个指向实际的事件,复制这些EventData.OffsetSequenceNumber进入新的EventData.ApplicationProperties),或者它FWD到SERVICEBUS将其排队或存储在其他地方,基本上,识别并推迟处理毒物事件.
  4. 如果您处理了所有可能的情况并且仍在运行Exceptions- catch'em&shutdown EPHfailfast具有此异常的进程.当EPH它恢复时 - 它将从它左边开始.

为什么检查点"老事件"不起作用(读要了解EPH一般):

在幕后,EPH每个EventHub Consumergroup分区的接收器运行一个泵 - 其工作是从给定的checkpoint(如果存在)启动接收器并创建一个专用的IEventProcessor实现实例,然后receiveOffset检查点中指定的指定EventHub分区(如果不存在 - EventProcessorOptions.initialOffsetProvider)并最终调用IEventProcessorImpl.ProcessEventsAsync.CheckpointEPH进程Shutsdown和Partition的所有权被移动到另一个EPH实例时,其目的是能够可靠地开始处理消息.因此,checkpoint只有在启动时才会消耗,并且一旦泵启动就不会被读取.

正如我写的那样,EPH版本是2.2.10 ......

  • 通过UnregisterEventProcessorAsync关闭是一个被考虑的东西,是我倾向于的方法.感谢您的详细回复,验证了这种方法. (2认同)