You*_*uxu 6 azure azure-eventhub event-processor-host
我使用事件中心处理器主机来接收和处理来自事件中心的事件。为了获得更好的性能,我每 3 分钟调用一次检查点,而不是每次接收事件时调用检查点:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
await context.CheckpointAsync();
}
}
Run Code Online (Sandbox Code Playgroud)
但问题是,如果没有新事件发送到事件中心,可能有些事件永远不会成为检查点,因为如果没有新消息,则不会调用 ProcessEventAsync。
有什么建议可以确保所有处理的事件都是检查点,但仍然每隔几分钟检查点?
更新:根据 Sreeram 的建议,我更新了代码,如下所示:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
this.lastProcessedEventsCount += messages.Count();
if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
this.checkpointStopWatch.Restart();
if (this.lastProcessedEventsCount > 0)
{
await context.CheckpointAsync();
this.lastProcessedEventsCount = 0;
}
}
}
Run Code Online (Sandbox Code Playgroud)
很棒的案例 - 你正在报道!
在以下两种情况下,您可能会遭受损失event checkpoints(并因此而遭受损失):event replay
当您有稀疏数据流(例如:每 5 分钟一批消息,检查点间隔为 3 分钟)并且EventProcessorHost实例因某种原因关闭时 - 您可以看到2 min-EventData重新处理。要处理这种情况,请在收到关闭 - /通知时跟踪lastProcessedEvent完成IEventProcessor.onEvents/ & 检查点。IEventProcessor.ProcessEventsAsyncIEventProcessor.onCloseIEventProcessor.CloseAsync
可能只是有一种情况 - 没有更多特定的事件EventHubs partition。在这种情况下,您永远不会看到最后一个事件被检查点 - 使用您的Checkpointing strategy. 但是,当您有连续的流EventData并且没有发送到特定的EventHubs 分区( EventHubClient.send(EventData_Without_PartitionKey)) 时,这种情况并不常见。如果您认为 - 您可能会遇到这种情况,请使用:
EventProcessorOptions.setInvokeProcessorAfterReceiveTimeout(true); // 在 java 或 EventProcessorOptions.InvokeProcessorAfterReceiveTimeout = true; // 在 C# 中
processEventsAsync标记以经常唤醒。然后,跟踪LastProcessedEventData并根据这些事件的属性,LastCheckpointedEventData在没有收到任何消息时判断是否进行检查点。EventsEventData.SequenceNumber
| 归档时间: |
|
| 查看次数: |
2670 次 |
| 最近记录: |