Pau*_*els 11 c# azure azure-eventhub .net-4.7
我有一些使用服务总线事件数据的代码,我怀疑我需要使用offset属性,因为目前我的程序(或似乎是)一遍又一遍地重新运行相同的事件中心数据.
我的代码如下:
public class EventHubListener : IEventProcessor
{
private static EventHubClient _eventHubClient;
private const string EhConnectionStringNoPath = "Endpoint=...";
private const string EhConnectionString = EhConnectionStringNoPath + ";...";
private const string EhEntityPath = "...";
public void Start()
{
_eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();
EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);
foreach (string partitionId in eventHub.PartitionIds)
{
defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
{
PartitionId = partitionId
}, new EventProcessorCheckpointManager());
Console.WriteLine("Processing : " + partitionId);
}
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
MyData data = JsonConvert.DeserializeObject<MyData>(bytes);
Run Code Online (Sandbox Code Playgroud)
当我一遍又一遍地收到相同的消息时,我怀疑我需要做这样的事情:
string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);
Run Code Online (Sandbox Code Playgroud)
但是,Offset
是一个字符串,即使它似乎是一个数值(例如"12345").关于context.CheckPointAsync()
它的文档似乎可能是答案; 然而,在循环结束时发出它似乎没有任何区别.
所以,我有两个问题:
编辑:
经过一段时间的捣乱,我想出了一些可以避免这个问题的东西; 但是,我当然不会声称它是一个解决方案:
var filteredMessages =
messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
.OrderBy(a => a.EnqueuedTimeUtc);
Run Code Online (Sandbox Code Playgroud)
使用EventProcessorHost
似乎实际上使问题更严重; 也就是说,不仅重播历史事件,而且它们似乎以随机顺序重播.
编辑:
我偶然发现了@Mikhail的这篇优秀文章,它确实解决了我的确切问题.然而; 并且可能是我问题的根源(或其中之一,假设这是正确的,那么我不确定为什么使用EventProcessorHost
它不仅仅是开箱即用,因为@Mikhail在评论中说自己).但是,ServiceBus版本ICheckpointManager
只有一个接口方法:
namespace Microsoft.ServiceBus.Messaging
{
public interface ICheckpointManager
{
Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
}
}
Run Code Online (Sandbox Code Playgroud)
您的标题应该是事件中心,而不是服务总线。对于你的问题:
归档时间: |
|
查看次数: |
232 次 |
最近记录: |