理解检查指向eventhub

cs0*_*815 13 c# azureservicebus azure-eventhub

我想确保,如果我的eventhub客户端崩溃(当前是一个控制台应用程序),它只会选择尚未从eventhub获取的事件.实现这一目标的一种方法是利用抵消.但是,这(根据我的理解)要求客户端存储最新的偏移量(除了事件似乎不一定会触及SequenceNumber排序的ProcessEventsAsync方法的foreach循环).

另一种方法是使用检查点.我认为它们是使用提供的存储帐户凭据通过服务器(eventhub)保留的.它是否正确?

这是我目前使用的一些初步代码:

public class SimpleEventProcessor : IEventProcessor
{
    private Stopwatch _checkpointStopWatch;

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    Task IEventProcessor.OpenAsync(PartitionContext context)
    {
        Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
        _checkpointStopWatch = new Stopwatch();
        _checkpointStopWatch.Start();
        return Task.FromResult<object>(null);
    }

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var eventData in messages)
        {
            // do something                    
        }

        //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
        if (_checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
        {
            await context.CheckpointAsync();
            _checkpointStopWatch.Restart();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

我相信它发送每隔5分钟就会向服务器创建一个检查点.服务器如何知道哪个客户端已提交检查点(通过上下文)?此外,如果客户端重新启动,如何防止再次处理事件?此外,可能仍有一个长达5分钟的窗口,其中再次处理事件.也许我应该根据我的要求使用队列/主题?

PS:

这似乎就足够了:

async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    foreach (var eventData in messages)
    {
        // do something
    }
    await context.CheckpointAsync();
}
Run Code Online (Sandbox Code Playgroud)

Sre*_*ati 22

在回答之前,我先简单介绍几个基本术语:

EventHubs是高吞吐量的持久事件摄取管道.简单地说 - 它是云中可靠的事件流.

EventData上的偏移量(流中的一个事件)实际上是流上的光标.拥有此Cursor - 将启用类似的操作 - 重新开始从此游标(也称为偏移)读取 - 包含或排除.

EventProcessor库是EventHubs团队构建的框架,在ServiceBus SDK的顶部,使"eventhub receiver gu" - 看起来更容易.ZooKeeper for Kafka < - > EPH for Event Hub.这将确保当一个特定的分区上运行EventProcessor过程中死亡/崩溃-这将是从去年检查点的偏移继续在其他可用EventProcessorHost实例- .

CheckPoint:从今天开始 - EventHubs仅支持客户端检查指向.当您从客户端代码调用Checkpoint时:

await context.CheckpointAsync();
Run Code Online (Sandbox Code Playgroud)

- 它将转换为存储调用(直接来自客户端) - 它将当前偏移存储在您提供的存储帐户中.EventHubs服务不会与Storage for Check-pointing对话.

答案

EventProcessor框架旨在实现您正在寻找的东西.

检查点不会通过服务器(即EVENTHUBS服务)保留.它纯粹是客户端.您正在与Azure存储进行通信.这就是EventProcessor库带来一个新的附加依赖 - AzureStorageClient的原因.您可以连接到存储帐户和写入检查点的容器 - 我们维护所有权信息 - EPH实例(名称)到他们拥有的EventHubs的分区以及他们当前读取/处理的检查点,直到.

根据基于计时器的检查点模式 - 您最初有 - 如果进程发生故障 - 您将在最后5分钟的窗口中重新执行事件.这是一个健康的模式:

  1. 基本假设是故障是罕见的事件 - 因此您很少会处理重复事件
  2. 你最终会减少对存储服务的调用(你可以通过经常检查来轻松压倒).我会更进一步,实际上,将异步触发检查点调用.如果检查点失败,OnProcessEvents不会失败!

如果你想要绝对没有事件重复 - 你需要在下游管道中构建这个重复数据删除逻辑.

  • 每次EventProcessorImpl启动时 - 查询下游的最后一个序列号.它得到并保持丢弃事件,直到当前序列没有.

HTH!SREE

  • 简而言之 - 只需要明确一点,EventHubs服务 - 完全不知道您正在检查Azure存储.EventProcessor库 - 仅使用Azure存储库帮助检查Offset(以及跨多个实例管理租约). (2认同)