如何使用ServiceBus EventData Offset Value

Pau*_*els 11 c# azure azure-eventhub .net-4.7

我有一些使用服务总线事件数据的代码,我怀疑我需要使用offset属性,因为目前我的程序(或似乎是)一遍又一遍地重新运行相同的事件中心数据.

我的代码如下:

public class EventHubListener : IEventProcessor
{
    private static EventHubClient _eventHubClient;        
    private const string EhConnectionStringNoPath = "Endpoint=...";
    private const string EhConnectionString = EhConnectionStringNoPath + ";...";
    private const string EhEntityPath = "...";        

    public void Start()
    {
        _eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
        EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();            
        EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);

        foreach (string partitionId in eventHub.PartitionIds)
        {
            defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
            {
                PartitionId = partitionId
            }, new EventProcessorCheckpointManager());

            Console.WriteLine("Processing : " + partitionId);
        }
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData eventData in messages)
        {                
            string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
            MyData data = JsonConvert.DeserializeObject<MyData>(bytes);
Run Code Online (Sandbox Code Playgroud)

当我一遍又一遍地收到相同的消息时,我怀疑我需要做这样的事情:

string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);
Run Code Online (Sandbox Code Playgroud)

但是,Offset是一个字符串,即使它似乎是一个数值(例如"12345").关于context.CheckPointAsync()它的文档似乎可能是答案; 然而,在循环结束时发出它似乎没有任何区别.

所以,我有两个问题:

  1. 什么是抵消?它是我认为它(即流中一个点的数字标记),如果是这样,为什么它是一个字符串?
  2. 为什么我会再次收到相同的消息?据我了解事件中心,虽然他们至少保证一次,但一旦检查点出现问题,我就不应该收到相同的消息.

编辑:

经过一段时间的捣乱,我想出了一些可以避免这个问题的东西; 但是,我当然不会声称它是一个解决方案:

var filteredMessages =
            messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
            .OrderBy(a => a.EnqueuedTimeUtc);
Run Code Online (Sandbox Code Playgroud)

使用EventProcessorHost似乎实际上使问题更严重; 也就是说,不仅重播历史事件,而且它们似乎以随机顺序重播.

编辑:

我偶然发现了@Mikhail的这篇优秀文章,它确实解决了我的确切问题.然而; 并且可能是我问题的根源(或其中之一,假设这是正确的,那么我不确定为什么使用EventProcessorHost它不仅仅是开箱即用,因为@Mikhail在评论中说自己).但是,ServiceBus版本ICheckpointManager只有一个接口方法:

namespace Microsoft.ServiceBus.Messaging
{

    public interface ICheckpointManager
    {
        Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
    }
}
Run Code Online (Sandbox Code Playgroud)

War*_*Zhu 1

您的标题应该是事件中心,而不是服务总线。对于你的问题:

  1. 虽然 Event Hub 的设计与 Kafka 类似,但一个很大的区别是您应该自己管理偏移量。事件中心代理完全不知道您的消费者组的偏移量。
  2. 因此Event Hub sdk提供了一些帮助类来存储存储帐户中的偏移量,但在处理消息后您仍然需要手动调用检查点。