我正在使用EventProcessorHost和一个IEventProcessor类(调用它:MyEventProcessor)从EventHub接收事件.我通过在两台服务器上运行我的EPH,并使用相同的ConsumerGroup连接到Hub,但使用唯一的hostName(使用机器名称)将其扩展到两台服务器.
问题是:在白天/黑夜的随机时间,应用程序记录:
Exception information:
Exception type: ReceiverDisconnectedException
Exception message: New receiver with higher epoch of '186' is created hence current receiver with epoch '186' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.
at Microsoft.ServiceBus.Common.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.ServiceBus.Common.Parallel.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
Run Code Online (Sandbox Code Playgroud)
此异常与LeaseLostException同时发生,当它尝试检查点时,从MyEventProcessor的CloseAsync方法抛出.(由于ReceiverDisconnectedException,可能正在调用Close?)
我认为这是由于Event Hubs在扩展到多台机器时的自动租赁管理而发生的.但我想知道我是否需要做一些不同的事情以使其更干净地工作并避免这些例外?例如:有时代的东西?
事件中心不允许您存储超过 7(最多 30)天的消息。具有这些限制的 Azure 建议的 PaaS 事件溯源架构是什么?如果是事件中心 + 快照,如果我们需要以某种方式重建该状态会发生什么?另外,事件中心是对 KSQL/Spark Azure 流分析的回答吗?
使用Azure Service Bus实现基于强类型消息的路由的最简单方法是什么.
假设我们只有一个消费者并且正在使用服务总线队列,是否更容易为每种消息类型创建队列(在我们的例子中是事件消息),或者只是为所有消息创建一个队列并处理消费者的路由?
如果我们有多个消费者并且想要发布pub-sub消息,那么我们是应该为每种消息类型创建一个主题,为每个消息类型创建一个订阅,还是只为所有消息创建一个主题,然后在消费者上处理路由?
我们遇到了很多这些异常,在高峰流量期间向EventHubs发送事件:
"无法将事件发送到EventHub.例外:Microsoft.ServiceBus.Messaging.MessagingException:服务器无法处理请求;请重试该操作.如果问题仍然存在,请联系您的Service Bus管理员并提供跟踪ID." 或"无法将事件发送到EventHub.例外:System.TimeoutException:操作未在分配的时间内完成"
你可以在这里清楚地看到它:
正如您所看到的,当传入的消息超过400K事件/小时(或~270 MB /小时)时,我们得到了许多内部错误,服务器忙错误,失败请求.这不仅仅是一个短暂的问题.这显然与吞吐量有关.
我们的EH有32个分区,7天的消息保留和5个吞吐量单位.OperationTimeout设置为5分钟,我们使用默认的RetryPolicy.
我们还需要在这里调整一下吗?我们真的很关心EH的可扩展性.
谢谢
在我的EventHub处理器中,我得到了Microsoft.ServiceBus.Messaging.LeaseLostException的例外.
这个例外是什么意思?这个例外可能的根本原因是什么?
以下是堆栈跟踪:
在Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__24.MoveNext()\ r \n ---从抛出异常的上一个位置的堆栈跟踪结束---\r \n在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务)\ r \n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)\ r \n在Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__25.MoveNext()\ r \n ---堆栈跟踪结束从抛出异常的先前位置---\r \n在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务)\ r \n在System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)\ r \n
在MyEventHub.EventProcessor`1.d__6.MoveNext()\ r \n\r \nMicrosoft.WindowsAzure.Storage.StorageException:\"远程服务器返回错误:(409)冲突.\":
at c:\ Program Files(x86)\ Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Executor\Executor.cs中的Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync [T](IAsyncResult result):line 60\r \n在Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<> c__DisplayClass4.b__3(IAsyncResult ar)在c:\ Program Files(x86)\ Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Util\AsyncExtensions.cs:第115行\ r \n ---从抛出异常的上一个位置开始的堆栈跟踪结束---\r \n在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务)\ r \n at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)\ r \n在Microsoft.ServiceBus.Messaging.BlobLeaseManager.d__24.MoveNext()\ r \n\r \n \nSystem.Net.WebException:\"远程服务器返回错误:(409)冲突.\":在Microsoft.WindowsAzure.Storage.Shared.Protocol.HttpResponseParsers.ProcessExpectedStatusCodeNoException [T](HttpStatusCo)de expectedStatusCode,HttpStatusCode actualStatusCode,T retVal,StorageCommandBase 1 cmd, Exception ex) in c:\\Program Files (x86)\\Jenkins\\workspace\\release_dotnet_master\\Lib\\Common\\Shared\\Protocol\\HttpResponseParsers.Common.cs:line 50\r\n
at Microsoft.WindowsAzure.Storage.Blob.CloudBlob.<>c__DisplayClass33.<RenewLeaseImpl>b__32(RESTCommand1 cmd,HttpWebResponse resp,Exception ex,OperationContext ctx)在c:\ Program Files(x86)\ Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Blob\CloudBlob.cs:line 3186\r \n在c:\ Program …
我想从Microsoft Azure EventHub获取事件数.我可以使用EventHubReceiver.Receive(maxcount)但它在大量大事件上很慢.
有NamespaceManager.GetEventHubPartition(..).EndSequenceNumber属性似乎正在做这个技巧,但我不确定它是否是正确的方法.
我正在处理EventHub的高吞吐量应用程序.根据文档,为了从单个发送方实现非常高的吞吐量,需要客户端批处理(不超过每个事件256 KB的限制).
使用Service Bus代理消息传递提高性能的最佳实践建议客户端批处理以实现性能改进.它描述了客户端批处理可用于队列或主题客户端,这使得能够在一段时间内延迟发送消息,然后它在一个批处理中传输消息.
EventHub客户端中是否提供客户端批处理?
作为安全产品的一部分,我拥有大规模云服务(azure worker 角色),它从事件中心读取事件,将它们分批处理到 ~2000 并存储在 blob 存储中。每个事件都有一个 MachineId(发送它的机器)。事件以随机顺序来自事件中心,我以随机顺序将它们存储在 blob 存储中。吞吐量高达 125K 事件/秒,每个事件约为 2K,因此我们有高达 250MB/秒的流量。我们有大约 100 万台机器...
稍后,另一个云服务下载 blob 并对事件运行一些检测逻辑。他按 MachineId 对事件进行分组,并尝试从机器时间线中了解某些内容
问题是今天来自同一台机器的事件被填充到不同的 blob。如果我可以通过它们的 MachineId 以某种方式对事件进行分组,并确保机器的某个时间窗口填充到相同的 blob,这将增加我可以在云中进行的检测。
我们确实将事件写入另一个 Map reduce 系统,并且在那里我们正在做很多复杂的检测,但那些当然具有高延迟。如果我可以在云中更好地对事件进行分组,我就可以实时捕获更多信息
我有什么技术可以帮助我吗?
提前致谢
我目前在 Azure 中设置了一个 EventHub 实例。它有5个分区。我想知道,如果有什么PartitionKey总是要成为之间的数字0,并n-1与n被分区数目。
我有以下代码:
private static async Task SendMessagesToEventHub(int numMessagesToSend)
{
var sender = eventHubClient.CreatePartitionSender("test1");
for (var i = 0; i < numMessagesToSend; i++)
{
try
{
var message = $"Message {i}";
Console.WriteLine($"Sending message: {message}");
await sender.SendAsync(new EventData(Encoding.UTF8.GetBytes(message)));
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
await Task.Delay(10);
}
Console.WriteLine($"{numMessagesToSend} messages sent.");
}
Run Code Online (Sandbox Code Playgroud)
这会引发异常
指定的分区对于 EventHub 分区发送方或接收方无效。它应该在 0 到 4 之间。
在EventHub的文档中,他们是这样说的PartitionKey:
EventData 类具有 …