cs0*_*815 13 c# azureservicebus azure-eventhub
我想确保,如果我的eventhub客户端崩溃(当前是一个控制台应用程序),它只会选择尚未从eventhub获取的事件.实现这一目标的一种方法是利用抵消.但是,这(根据我的理解)要求客户端存储最新的偏移量(除了事件似乎不一定会触及SequenceNumber排序的ProcessEventsAsync方法的foreach循环).
另一种方法是使用检查点.我认为它们是使用提供的存储帐户凭据通过服务器(eventhub)保留的.它是否正确?
这是我目前使用的一些初步代码:
public class SimpleEventProcessor : IEventProcessor
{
private Stopwatch _checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
// do something
}
//Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
if (_checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
_checkpointStopWatch.Restart();
}
}
}
Run Code Online (Sandbox Code Playgroud)
我相信它发送每隔5分钟就会向服务器创建一个检查点.服务器如何知道哪个客户端已提交检查点(通过上下文)?此外,如果客户端重新启动,如何防止再次处理事件?此外,可能仍有一个长达5分钟的窗口,其中再次处理事件.也许我应该根据我的要求使用队列/主题?
PS:
这似乎就足够了:
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
// do something
}
await context.CheckpointAsync();
}
Run Code Online (Sandbox Code Playgroud)
Sre*_*ati 22
在回答之前,我先简单介绍几个基本术语:
EventHubs是高吞吐量的持久事件摄取管道.简单地说 - 它是云中可靠的事件流.
EventData上的偏移量(流中的一个事件)实际上是流上的光标.拥有此Cursor - 将启用类似的操作 - 重新开始从此游标(也称为偏移)读取 - 包含或排除.
EventProcessor库是EventHubs团队构建的框架,在ServiceBus SDK的顶部,使"eventhub receiver gu" - 看起来更容易.ZooKeeper for Kafka < - > EPH for Event Hub.这将确保当一个特定的分区上运行EventProcessor过程中死亡/崩溃-这将是从去年检查点的偏移继续在其他可用EventProcessorHost实例- .
CheckPoint:从今天开始 - EventHubs仅支持客户端检查指向.当您从客户端代码调用Checkpoint时:
await context.CheckpointAsync();
Run Code Online (Sandbox Code Playgroud)
- 它将转换为存储调用(直接来自客户端) - 它将当前偏移存储在您提供的存储帐户中.EventHubs服务不会与Storage for Check-pointing对话.
答案
EventProcessor框架旨在实现您正在寻找的东西.
检查点不会通过服务器(即EVENTHUBS服务)保留.它纯粹是客户端.您正在与Azure存储进行通信.这就是EventProcessor库带来一个新的附加依赖 - AzureStorageClient的原因.您可以连接到存储帐户和写入检查点的容器 - 我们维护所有权信息 - EPH实例(名称)到他们拥有的EventHubs的分区以及他们当前读取/处理的检查点,直到.
根据基于计时器的检查点模式 - 您最初有 - 如果进程发生故障 - 您将在最后5分钟的窗口中重新执行事件.这是一个健康的模式:
如果你想要绝对没有事件重复 - 你需要在下游管道中构建这个重复数据删除逻辑.
HTH!SREE