Jos*_*rra 4 azure azure-eventhub
我们使用EventProcessorHost从Azure EventHubs接收事件.我一直试图配置它(通过EventProcessorOptions.InitialOffsetProvider)来从UTC读取事件,但它总是从源的开头读取.我没有保存检查点(我甚至删除了创建的BLOB容器).这就是我设置它的方式:
DateTime startDate = DateTime.UtcNow;
var epo = new EventProcessorOptions
            {
                MaxBatchSize = 100, 
                PrefetchCount = 100, 
                ReceiveTimeOut = TimeSpan.FromSeconds(120),  
                InitialOffsetProvider = (name) => startDate  
            };
任何指导将不胜感激.
认为这在版本2.0.0中已更改 - Rajiv的代码现在将是:
var eventProcessorOptions = new EventProcessorOptions
{
    InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};
以下是具有完全限定类名的示例块:
    private static async Task MainAsync(string[] args)
    {
        try{
            Console.WriteLine("Registering EventProcessor...");
            string AISEhConnectionStringEndpoint = Configuration["AISEhConnectionStringEndpoint"];
            string AISEhConnectionStringSharedAccessKeyName = Configuration["AISEhConnectionStringSharedAccessKeyName"];
            string AISEhConnectionStringSharedAccessKey = Configuration["AISEhConnectionStringSharedAccessKey"];
            string EhConnectionString = $"Endpoint={AISEhConnectionStringEndpoint};SharedAccessKeyName={AISEhConnectionStringSharedAccessKeyName};SharedAccessKey={AISEhConnectionStringSharedAccessKey}";
            string AISEhEntityPath = Configuration["AISEhEntityPath"];
            string AISEhConsumerGroupName = Configuration["AISEhConsumerGroupName"];
            string AISStorageContainerName = Configuration["AISStorageContainerName"];
            string AISStorageAccountName = Configuration["AISStorageAccountName"];
            string AISStorageAccountKey = Configuration["AISStorageAccountKey"];
            string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", AISStorageAccountName, AISStorageAccountKey);
            var eventProcessorHost = new Microsoft.Azure.EventHubs.Processor.EventProcessorHost(
                AISEhEntityPath,
                AISEhConsumerGroupName,
                EhConnectionString,
                StorageConnectionString,
                AISStorageContainerName);
            var options = new Microsoft.Azure.EventHubs.Processor.EventProcessorOptions
            {
                InitialOffsetProvider = (partitionId) => Microsoft.Azure.EventHubs.EventPosition.FromEnqueuedTime(DateTime.UtcNow)
            };
            // Registers the Event Processor Host and starts receiving messages
            await eventProcessorHost.RegisterEventProcessorAsync<GetEvents>(options);
            Thread.Sleep(Timeout.Infinite);
            // Disposes of the Event Processor Host
            await eventProcessorHost.UnregisterEventProcessorAsync();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
            NLog.LogManager.GetCurrentClassLogger().Error(ex);
            throw;
        }
    }
}
这里是我的一般设置,秘密/确切的地址被掩盖,以帮助解决问题,因为我发现这样做比起拔牙更不愉快:
"AISEhConnectionStringEndpoint": "sb://<my bus address>.servicebus.windows.net/",
"AISEhConnectionStringSharedAccessKeyName": "<my key name>",
"AISEhConnectionStringSharedAccessKey": "<yeah nah>",
"AISEhEntityPath": "<Event Hub entity path>",
"AISEhConsumerGroupName":  "<consumer group name e.g $Default>",
"AISStorageContainerName":  "<storage container name>",
"AISStorageAccountName": "<storage account name>",
"AISStorageAccountKey": "<yeah nah>",
我发现blob中的checkpoint文件夹仍然存在,我的应用程序正在考虑这个并忽略我在EventProcessorOptions中设置的日期.删除容器后,它开始按预期运行(计入UTC日期).
| 归档时间: | 
 | 
| 查看次数: | 3386 次 | 
| 最近记录: |