该应用程序使用.NET 4.6.1和Microsoft.Azure.ServiceBus.EventProcessorHost nuget软件包v2.0.2及其依赖WindowsAzure.ServiceBus软件包v3.0.1来处理Azure事件中心消息.
该应用程序有一个实现IEventProcessor.当从ProcessEventsAsync方法抛出未处理的异常时,EventProcessorHost永远不会将这些消息重新发送到正在运行的实例IEventProcessor.(有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送.)
有没有办法强制将导致异常的事件消息重新发送EventProcessorHost给IEventProcessor实现?
本评论中提供了一个可能的解决方案,该问题几乎完全相同: IEventProcessor.ProcessEventsAsync中的Redeliver未处理的EventHub消息
该评论建议保留最后一个成功处理的事件消息的副本,并在发生异常时显式使用该消息进行检查点ProcessEventsAsync.但是,在实施和测试这样的解决方案后,EventProcessorHost仍然不会重新发送.实现非常简单:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
Run Code Online (Sandbox Code Playgroud)
此处提供部分日志示例:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
在以下方案中需要有关使用Azure事件中心的帮助.我认为消费者群体可能是这种情况的正确选择,但我无法在网上找到具体的例子.
以下是问题的粗略描述以及使用事件中心的建议解决方案(我不确定这是否是最佳解决方案.非常感谢您的反馈)

我有多个事件源可以生成大量事件数据(来自传感器的遥测数据),需要保存到我们的数据库中,并且应该并行执行一些分析,如运行平均值,最小值 - 最大值.
发送方只能将数据发送到单个端点,但事件中心应该使这些数据可供两个数据处理程序使用.
我正在考虑使用两个使用者组,第一个是工作者角色实例的集群,负责将数据保存到我们的键值存储,第二个消费者组将是一个分析引擎(可能与Azure流分析一起使用) ).
首先,我如何设置消费者群体,在发送者/接收者方面是否需要做些事情,以便所有消费者群体都能看到事件副本?
我确实在线阅读了很多示例,但是他们使用client.GetDefaultConsumerGroup();和/或让所有分区都由同一个工作者角色的多个实例处理.
对于我的场景,当触发事件时,它需要由两个不同的工作者角色并行处理(一个保存数据,另一个执行某些分析)
谢谢!
在我的EventHub处理器中,我得到了Microsoft.ServiceBus.Messaging.LeaseLostException的例外.
这个例外是什么意思?这个例外可能的根本原因是什么?
以下是堆栈跟踪:
在Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__24.MoveNext()\ r \n ---从抛出异常的上一个位置的堆栈跟踪结束---\r \n在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务)\ r \n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)\ r \n在Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__25.MoveNext()\ r \n ---堆栈跟踪结束从抛出异常的先前位置---\r \n在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务)\ r \n在System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)\ r \n
在MyEventHub.EventProcessor`1.d__6.MoveNext()\ r \n\r \nMicrosoft.WindowsAzure.Storage.StorageException:\"远程服务器返回错误:(409)冲突.\":
at c:\ Program Files(x86)\ Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Executor\Executor.cs中的Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync [T](IAsyncResult result):line 60\r \n在Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<> c__DisplayClass4.b__3(IAsyncResult ar)在c:\ Program Files(x86)\ Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Util\AsyncExtensions.cs:第115行\ r \n ---从抛出异常的上一个位置开始的堆栈跟踪结束---\r \n在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务)\ r \n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)\ r \n在Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__24.MoveNext()\ r \n\r \n \nSystem.Net.WebException:\"远程服务器返回错误:(409)冲突.\":在Microsoft.WindowsAzure.Storage.Shared.Protocol.HttpResponseParsers.ProcessExpectedStatusCodeNoException [T](HttpStatusCo)de expectedStatusCode,HttpStatusCode actualStatusCode,T retVal,StorageCommandBase 1 cmd, Exception ex) in c:\\Program Files (x86)\\Jenkins\\workspace\\release_dotnet_master\\Lib\\Common\\Shared\\Protocol\\HttpResponseParsers.Common.cs:line 50\r\n
at Microsoft.WindowsAzure.Storage.Blob.CloudBlob.<>c__DisplayClass33.<RenewLeaseImpl>b__32(RESTCommand1 cmd,HttpWebResponse resp,Exception ex,OperationContext ctx)在c:\ Program Files(x86)\ Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Blob\CloudBlob.cs:line 3186\r \n在c:\ Program …