我正在忙着EventProcessorHost为一个天蓝色的EventBus客户端实现一个客户端.
我有一个实现IEventProcessor如下的类:
public class MyEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
//TODO: get provider id from parent class
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Debug.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
public Task OpenAsync(PartitionContext context)
{
Debug.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
eventHandler = new MyEventHandler();
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach …Run Code Online (Sandbox Code Playgroud) 我使用事件中心处理器主机来接收和处理来自事件中心的事件。为了获得更好的性能,我每 3 分钟调用一次检查点,而不是每次接收事件时调用检查点:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
if (checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
await context.CheckpointAsync();
}
}
Run Code Online (Sandbox Code Playgroud)
但问题是,如果没有新事件发送到事件中心,可能有些事件永远不会成为检查点,因为如果没有新消息,则不会调用 ProcessEventAsync。
有什么建议可以确保所有处理的事件都是检查点,但仍然每隔几分钟检查点?
更新:根据 Sreeram 的建议,我更新了代码,如下所示:
public async Task ProcessEventAsync(context, messages)
{
foreach (var eventData in messages)
{
// do something
}
this.lastProcessedEventsCount += messages.Count();
if (this.checkpointStopWatth.Elapsed > TimeSpan.FromMinutes(3);
{
this.checkpointStopWatch.Restart();
if (this.lastProcessedEventsCount > 0)
{
await context.CheckpointAsync();
this.lastProcessedEventsCount = 0;
}
}
}
Run Code Online (Sandbox Code Playgroud) 语境
我们当前正在使用Microsoft.Azure.ServiceBus.EventProcessorHost从Azure Event-Hub提取数据。我们在.NET框架上运行,而不是.NET Core。
Microsoft的此公告(于2017年2月2日发布)建议更新的Microsoft.Azure.EventHubs.Processor是将来的发展方式,无论您使用.NET运行时如何,因为它们将维护单个代码库。
同时,最近更新的官方Microsoft Event-Hub示例仍然建议用于.NET框架的旧库(Microsoft.Azure.ServiceBus.EventProcessorHost)。
自宣布以来,两个库均已更新和发展。
题
由于我们在.NET Framework上,因此应该使用哪个库?
我们是否应该迁移到较新的Microsoft.Azure.EventHubs.Processor以便从最新开发,改进和错误修复中受益?还是旧的跟上步伐?
这个问题也与新手有关,他们应该选择哪个库来开始使用Azure Event-Hub。
我正在使用 Azure 事件中心。来自事件中心的传出消息由应用程序使用 进行处理EventProcessorHost。事件中心有一个消费者组,即$Default. 我发现传出的消息多于传入的消息。不确定为什么事件中心中的传出多于传入。