Azure事件中心和多个使用者组

inf*_*ity 19 azure publish-subscribe azure-eventhub

在以下方案中需要有关使用Azure事件中心的帮助.我认为消费者群体可能是这种情况的正确选择,但我无法在网上找到具体的例子.

以下是问题的粗略描述以及使用事件中心的建议解决方案(我不确定这是否是最佳解决方案.非常感谢您的反馈)

在此输入图像描述

我有多个事件源可以生成大量事件数据(来自传感器的遥测数据),需要保存到我们的数据库中,并且应该并行执行一些分析,如运行平均值,最小值 - 最大值.

发送方只能将数据发送到单个端点,但事件中心应该使这些数据可供两个数据处理程序使用.

我正在考虑使用两个使用者组,第一个是工作者角色实例的集群,负责将数据保存到我们的键值存储,第二个消费者组将是一个分析引擎(可能与Azure流分析一起使用) ).

首先,我如何设置消费者群体,在发送者/接收者方面是否需要做些事情,以便所有消费者群体都能看到事件副本?

我确实在线阅读了很多示例,但是他们使用client.GetDefaultConsumerGroup();和/或让所有分区都由同一个工作者角色的多个实例处理.

对于我的场景,当触发事件时,它需要由两个不同的工作者角色并行处理(一个保存数据,另一个执行某些分析)

谢谢!

cac*_*sar 32

TLDR:看起来很合理,只需使用CreateConsumerGroupIfNotExists使用不同的名称来创建两个Consumer Group.

消费者群体主要是一个概念,因此它们的工作方式取决于您的订阅者的实施方式.如您所知,从概念上讲,它们是一组一起工作的订阅者,以便每个组都接收所有消息,并且在理想情况下(不会发生)情况可能会消耗每个消息一次.这意味着每个使用者组 "使所有分区都由同一辅助角色的多个实例处理".你要这个.

这可以以不同方式实现.Microsoft提供了两种直接从事件中心消费消息的方法,以及使用Streaming Analytics等可能基于两种直接方式构建的选项.第一种方式是Event Hub Receiver,第二种方式是更高级别的事件处理器主机.

我没有直接使用Event Hub Receiver所以这个特别的评论是基于这些系统如何工作的理论和文档中的推测:虽然它们是从EventHubConsumerGroups 创建的,由于这些接收器不相互协调,因此它没有用处.如果您使用这些,您将需要(并且可以!)自己完成所有协调和提交偏移,这在某些情况下具有优势,例如在与计算聚合相同的事务中将偏移量写入事务DB.使用这些低级接收器使用相同的Azure使用者组的不同逻辑使用者群体可能不应该(规范性而非实用的建议)特别成问题,但是如果要么重要或者您更改为EventProcessorHosts,则应使用不同的名称.

现在转到更有用的信息,EventProcessorHosts可能建立在EventHubReceivers之上.它们是更高级别的东西,并且支持使多台机器作为逻辑消费者组一起工作.下面我已经从我的代码中包含了一个轻微编辑的片段,它使得一个EventProcessorHost在解释一些选择时留下了一堆注释.

//We need an identifier for the lease. It must be unique across concurrently 
//running instances of the program. There are three main options for this. The 
//first is a static value from a config file. The second is the machine's NETBIOS
//name ie System.Environment.MachineName. The third is a random value unique per run which
//we have chosen here, if our VMs have very weak randomness bad things may happen.

string hostName = Guid.NewGuid().ToString();

//It's not clear if we want this here long term or if we prefer that the Consumer 
//Groups be created out of band. Nor are there necessarily good tools to discover 
//existing consumer groups.
NamespaceManager namespaceManager = 
    NamespaceManager.CreateFromConnectionString(eventHubConnectionString);
EventHubDescription ehd = namespaceManager.GetEventHub(eventHubPath);
namespaceManager.CreateConsumerGroupIfNotExists(ehd.Path, consumerGroupName);

host = new EventProcessorHost(hostName, eventHubPath, consumerGroupName, 
    eventHubConnectionString, storageConnectionString, leaseContainerName);
//Call something like this when you want it to start
host.RegisterEventProcessorFactoryAsync(factory)
Run Code Online (Sandbox Code Playgroud)

您会注意到我告诉Azure创建一个新的Consumer Group如果它不存在,如果没有,您将收到一条可爱的错误消息.老实说,我不知道这是什么目的,因为它不包括跨实例需要相同的存储连接字符串,以便EventProcessorHost的协调(并且可能是提交)正常工作.

在这里,我提供了一张来自Azure存储资源管理器的图片,租用了我在11月试验的消费者群体的租约和可能的偏差.请注意,虽然我有一个testhub和一个testhub-testcg容器,但这是由于手动命名它们.如果它们在同一个容器中,那么就像"$ Default/0"vs"testcg/0". 活动中心租赁

如您所见,每个分区有一个blob.我的假设是这些blob用于两件事.第一个是用于在实例之间分配分区的Blob租约,请参见此处,第二个是在已提交的分区中存储偏移量.

消费实例不是将数据推送到消费者组,而是要求存储系统在一个分区中的某个偏移处获取数据.EventProcessorHosts是一个很好的高级方式,拥有一个逻辑消费者组,每个分区一次只能被一个消费者读取,并且不会忘记逻辑消费者组在每个分区中所取得的进展.

请记住,每个分区的吞吐量都是经过测量的,这样如果您最大限度地减少了入口,那么您只能拥有两个完全符合速度的逻辑消费者.因此,您需要确保有足够的分区和吞吐量单位,您可以:

  1. 读取您发送的所有数据.
  2. 如果由于问题而落后几个小时,请在24小时保留期内赶上.

总之:消费者群体是您所需要的.您阅读的使用特定使用者组的示例很好,在每个逻辑使用者组中使用Azure使用者组的相同名称,并且不同的逻辑使用者组使用不同的名称.

我还没有使用过Azure流分析,但至少在预览版中,您只能使用默认的使用者群组.因此,不要将默认的使用者组用于其他内容,如果您需要两个单独的Azure Stream Analytics,您可能需要做一些令人讨厌的事情.但它很容易配置!