我一直在学习事件中心,只是希望得到确认或更正我对事件中心的看法?我习惯于利用重试,有毒消息,至少一次交付等等,以获得Azure Service Bus Queues和Topics给我的正常企业消息传递解决方案.似乎Event Hubs旨在提供一种非常高规模的不同工具,您必须放弃一些更高规模的"企业"功能.
我正确地考虑了这个吗?是否还需要考虑其他细节?我意识到事件中心和主题可能存在一些功能重叠,但我只想弄清楚如何考虑使用事件中心.
该应用程序使用.NET 4.6.1和Microsoft.Azure.ServiceBus.EventProcessorHost nuget软件包v2.0.2及其依赖WindowsAzure.ServiceBus软件包v3.0.1来处理Azure事件中心消息.
该应用程序有一个实现IEventProcessor
.当从ProcessEventsAsync
方法抛出未处理的异常时,EventProcessorHost
永远不会将这些消息重新发送到正在运行的实例IEventProcessor
.(有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送.)
有没有办法强制将导致异常的事件消息重新发送EventProcessorHost
给IEventProcessor
实现?
本评论中提供了一个可能的解决方案,该问题几乎完全相同: IEventProcessor.ProcessEventsAsync中的Redeliver未处理的EventHub消息
该评论建议保留最后一个成功处理的事件消息的副本,并在发生异常时显式使用该消息进行检查点ProcessEventsAsync
.但是,在实施和测试这样的解决方案后,EventProcessorHost
仍然不会重新发送.实现非常简单:
private EventData _lastSuccessfulEvent;
public async Task ProcessEventsAsync(
PartitionContext context,
IEnumerable<EventData> messages)
{
try
{
await ProcessEvents(context, messages); // does actual processing, may throw exception
_lastSuccessfulEvent = messages
.OrderByDescending(ed => ed.SequenceNumber)
.First();
}
catch(Exception ex)
{
await context.CheckpointAsync(_lastSuccessfulEvent);
}
}
Run Code Online (Sandbox Code Playgroud)
此处提供部分日志示例:https://gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
我正在学习消息传递系统,但对这些术语感到困惑。
下面的所有消息传递系统都提供了具有不同功能集的服务之间的松散耦合。
queue
- FIFO,拉动机制,每个队列有 1 个消费者,但有任意数量的生产者?
message bus
- 发布/订阅模型,任意数量的消费者和任意数量的生产者处理消息?是Azure Service Bus
的实现message bus
?
event bus
- 发布/订阅模型,任意数量的消费者和任意数量的生产者处理事件?
就术语而言,人们是否可以互换使用message bus
和event bus
?
事件和消息有什么区别?在这种情况下,这些只是同义词吗?
event hub
- 发布/订阅模型,分区,重放,消费者可以将事件存储在外部存储中或接近实时数据分析。究竟什么是事件中心?
event grid
- 它可以用作事件中心的下游服务。它究竟做了哪些event hub
没有做的事情?
有人可以提供一些历史背景,说明每种技术如何演变为另一种技术,每种技术都与一些实际用例相关联吗?
我发现消息总线与消息队列有帮助
message-queue servicebus azureservicebus azure-eventhub azure-eventgrid
我正在使用EventProcessorHost和一个IEventProcessor类(调用它:MyEventProcessor)从EventHub接收事件.我通过在两台服务器上运行我的EPH,并使用相同的ConsumerGroup连接到Hub,但使用唯一的hostName(使用机器名称)将其扩展到两台服务器.
问题是:在白天/黑夜的随机时间,应用程序记录:
Exception information:
Exception type: ReceiverDisconnectedException
Exception message: New receiver with higher epoch of '186' is created hence current receiver with epoch '186' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used.
at Microsoft.ServiceBus.Common.ExceptionDispatcher.Throw(Exception exception)
at Microsoft.ServiceBus.Common.Parallel.TaskHelpers.EndAsyncResult(IAsyncResult asyncResult)
at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
Run Code Online (Sandbox Code Playgroud)
此异常与LeaseLostException同时发生,当它尝试检查点时,从MyEventProcessor的CloseAsync方法抛出.(由于ReceiverDisconnectedException,可能正在调用Close?)
我认为这是由于Event Hubs在扩展到多台机器时的自动租赁管理而发生的.但我想知道我是否需要做一些不同的事情以使其更干净地工作并避免这些例外?例如:有时代的东西?
在以下方案中需要有关使用Azure事件中心的帮助.我认为消费者群体可能是这种情况的正确选择,但我无法在网上找到具体的例子.
以下是问题的粗略描述以及使用事件中心的建议解决方案(我不确定这是否是最佳解决方案.非常感谢您的反馈)
我有多个事件源可以生成大量事件数据(来自传感器的遥测数据),需要保存到我们的数据库中,并且应该并行执行一些分析,如运行平均值,最小值 - 最大值.
发送方只能将数据发送到单个端点,但事件中心应该使这些数据可供两个数据处理程序使用.
我正在考虑使用两个使用者组,第一个是工作者角色实例的集群,负责将数据保存到我们的键值存储,第二个消费者组将是一个分析引擎(可能与Azure流分析一起使用) ).
首先,我如何设置消费者群体,在发送者/接收者方面是否需要做些事情,以便所有消费者群体都能看到事件副本?
我确实在线阅读了很多示例,但是他们使用client.GetDefaultConsumerGroup();
和/或让所有分区都由同一个工作者角色的多个实例处理.
对于我的场景,当触发事件时,它需要由两个不同的工作者角色并行处理(一个保存数据,另一个执行某些分析)
谢谢!
我想确保,如果我的eventhub客户端崩溃(当前是一个控制台应用程序),它只会选择尚未从eventhub获取的事件.实现这一目标的一种方法是利用抵消.但是,这(根据我的理解)要求客户端存储最新的偏移量(除了事件似乎不一定会触及SequenceNumber排序的ProcessEventsAsync方法的foreach循环).
另一种方法是使用检查点.我认为它们是使用提供的存储帐户凭据通过服务器(eventhub)保留的.它是否正确?
这是我目前使用的一些初步代码:
public class SimpleEventProcessor : IEventProcessor
{
private Stopwatch _checkpointStopWatch;
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
Task IEventProcessor.OpenAsync(PartitionContext context)
{
Console.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
// do something
}
//Call checkpoint every 5 minutes, so …
Run Code Online (Sandbox Code Playgroud) 是否有用于在测试或 CI 环境中代替 Azure 事件中心运行的 docker 映像?
我希望能够使用作为 docker 容器运行的模拟事件中心来建立完整的堆栈,人们是如何做到这一点的?我能想到的唯一其他选择是使用 ARM 模板来支持和拆除实际的云基础设施,这似乎有点浪费。
我正在构建一个.Net控制台应用程序来读取DocumentDB中的信息.控制台应用程序具有来自EventHub的数据,并在进入云时插入/更新最新数据.
我正在尝试从DocumentDB读取单个文档,我可以在请求文档之前确认文档存在.
if (DocumentDBRepository<DocumentDBItem>.DoesItemExist(name))
{
device = await DocumentDBRepository<DocumentDBItem>.GetItemAsync(name);
}
Run Code Online (Sandbox Code Playgroud)
我使用Microsoft的这个教程来构建用于访问DocumentDB记录的存储库,并且成功地使用了几乎所有方法.我可以更新/删除/查询数据库,但我无法读取单数项.
首先,它抛出了一个请求PartitionKey的异常.所以我修改了方法,将DB使用的PartitionKey添加到请求中.一旦我添加了PartitionKey,它就会抛出另一个异常,并显示一条消息"找不到资源"
public static async Task<T> GetItemAsync(string id)
{
try
{
RequestOptions options = new RequestOptions();
options.PartitionKey = new PartitionKey("DeviceId");
Document document = await client.ReadDocumentAsync(UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id), options);
return (T)(dynamic)document;
}
catch (DocumentClientException e)
{
if (e.StatusCode == HttpStatusCode.NotFound)
{
return null;
}
else
{
throw;
}
}
}
Run Code Online (Sandbox Code Playgroud)
我已经修改了我的调用以使用"GetItemsAsyc"并获得IEnumerable的文档并获得列表中的第一项,但是为什么我可以使用教程中的所有其他方法但这一点仍然有效是没有意义的抛出异常说"资源未找到".
我得到的例外情况:
"Message: {\"Errors\":[\"Resource Not Found\"]}\r\nActivityId: e317ae66-6500-476c-b70e-c986c4cbf1d9, Request URI: /apps/e842e452-2347-4a8e-8588-2f5f8b4803ad/services/2c490552-a24d-4a9d-a786-992c07356545/partitions/0281cfdd-0c60-499f-be4a-289723a7dbf9/replicas/131336364114731886s"
Run Code Online (Sandbox Code Playgroud) 我希望使用Azure Event Hubs开始一个新项目.该项目的一部分是一组可以从任何开发机器(加上CI)运行的集成测试.过去,为此,我使用了本地仿真器(例如,Cosmos DB,存储,服务架构等).这样开发机器就不需要真正的云凭证并且具有隔离的环境.但是,事件中心似乎没有本地模拟器.
我错过了Event Hubs模拟器吗?如果没有,AMQP服务会是一个不错的选择吗?对于事件中心,有没有合适的近似值?
谢谢
我为此创建了一个反馈项目.如果这对你有用,请投票!
我有一些使用服务总线事件数据的代码,我怀疑我需要使用offset属性,因为目前我的程序(或似乎是)一遍又一遍地重新运行相同的事件中心数据.
我的代码如下:
public class EventHubListener : IEventProcessor
{
private static EventHubClient _eventHubClient;
private const string EhConnectionStringNoPath = "Endpoint=...";
private const string EhConnectionString = EhConnectionStringNoPath + ";...";
private const string EhEntityPath = "...";
public void Start()
{
_eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();
EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);
foreach (string partitionId in eventHub.PartitionIds)
{
defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
{
PartitionId = partitionId
}, new EventProcessorCheckpointManager());
Console.WriteLine("Processing : " + partitionId);
}
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData …
Run Code Online (Sandbox Code Playgroud)