我正在使用Azure Service Bus来管理带有webrole和workerrole的消息.
我需要知道如何在不使用循环的情况下立即从队列中获取多条消息.
servicebus azure azure-worker-roles azure-queues azureservicebus
我需要在发布服务器和订阅服务器之间建立双向通信.这是为了方便前端MVC3应用程序使用关联过滤器定义订阅,然后将消息放到主题上.最后,MVC3控制器调用SubscriptionClient上的BeginReceive(),并等待响应.
问题似乎是创建和删除这些Subscription对象.开销很大,并且会使应用程序变慢.这并不是说要解决的各种限制,例如主题上的订阅数量不超过2000个.
在发布者和订阅者之间建立这种双向通信的最佳实践是什么?我们希望MVC3应用程序发布消息,然后等待对该确切消息的响应(通过CorrelationId属性和CorrelationFilter).我们已经缓存了NamespaceManager和MessagingFactory,因为它们在资源方面也非常昂贵,并且还因为我们被告知Service Bus使用显式配置模型,我们需要在角色启动期间预先创建大部分这些内容.
因此,这给我们带来了将请求与响应相关联的挑战,以及创建和删除订阅的巨大开销.还有什么更好的做法?我们应该保留SubscriptionClient的缓存,并每次交换过滤器吗?其他人都做了什么?我需要通过Web角色群集获得每秒5到10,000个MVC3请求的请求吞吐量.我们已经在使用AsyncController并在SubscriptionClient上使用异步BeginReceive().它似乎是数以千计的订阅的创建和删除,此时系统正在窒息.
UPDATE1: 根据此处提供的好建议,我们更新了此解决方案,以便在每个Web角色实例上保留SubscriptionClient对象的缓存.此外,我们已迁移到面向MessageSession的方法.
但是,这仍然没有缩放.似乎AcceptMessageSession()是一个非常昂贵的操作.MessageSession对象是否也应该被缓存并重新使用?每个打开的MessageSession对象是否都使用与服务总线的连接?如果是这样,这是否与订阅的并发连接配额相对应?
非常感谢.我想我们到了那里.Web上的大多数示例代码显示:在客户端上创建Topic(),然后是CreateSubscription(),然后是CreateSubscriptionClient(),然后是BeginReceive(),然后拆除所有对象.我只能说,如果你在现实生活中这样做,你的服务器就会被粉碎,你最快就能连接起来了.
我们需要通过这个东西每秒发出数千个请求,很明显这些对象必须被高速缓存和重用.那么,MessageSession还有另一个要缓存的项目吗?我将获得有趣的缓存,因为我们必须实现一个引用计数机制,其中一次只能给出一个对MessageSession的引用,因为这是针对http请求特定的请求/响应,我们不能有其他订阅者同时使用MessageSession对象.
UPDATE2: 好的,将MessageSession缓存以供重用是不可行的,因为它们只与订阅时的LockDuration一样长.这是一个无赖,因为最大LockDuration是5分钟.这些似乎是短期的pub/sub,而不是长期运行的分布式进程.看起来我们需要回到轮询Azure表.
摘要/评论 我们试图建立Service Bus,因为它具有规模潜力及其持久性和交付语义.但是,似乎有些情况下,它们之间的大量请求/响应不适合它.发布部分工作得很好,并且在后端拥有竞争的消费者是很好的,但是有一个前端请求等待定义的单一消费者响应,根本不能很好地扩展,因为MessageSessions需要太长时间才能完成通过AcceptMessageSession()或BeginAcceptMessageSession()创建,因为它们不适合缓存.
如果有人有另一种观点,我很乐意听到.
servicebus azure azure-web-roles asp.net-mvc-3 azureservicebus
像AKKA.net这样的演员模型带来很多好处,如可伸缩性,反应性,内存缓存等...当我尝试将AKKA与Azure服务总线队列进行比较时,我看到了几乎相同的主要好处Azure服务总线,但内存缓存的好处除外.
在生产环境中,AKKA需要具有更多内存的多个VM,处理能力以处理内存中的数百万个actor.对于Azure Service Bus Queues,不需要强大的主机.即使我们使用演员模型,也不需要进行监督或创建演员系统来管理数百万演员.Azure Service Bus可自动实现可伸缩性.
从长远来看,我认为Azure Service Bus Queues具有成本效益.随着负载的增加,无需IT管理员进行管理.对于多核,也不需要功能强大的系统.
AKKA演员模型是否适用于具有多核系统的本地数据中心,并且在考虑成本效益时不适合可以使用Azure服务的应用程序?
我们遇到了很多这些异常,在高峰流量期间向EventHubs发送事件:
"无法将事件发送到EventHub.例外:Microsoft.ServiceBus.Messaging.MessagingException:服务器无法处理请求;请重试该操作.如果问题仍然存在,请联系您的Service Bus管理员并提供跟踪ID." 或"无法将事件发送到EventHub.例外:System.TimeoutException:操作未在分配的时间内完成"
你可以在这里清楚地看到它:
正如您所看到的,当传入的消息超过400K事件/小时(或~270 MB /小时)时,我们得到了许多内部错误,服务器忙错误,失败请求.这不仅仅是一个短暂的问题.这显然与吞吐量有关.
我们的EH有32个分区,7天的消息保留和5个吞吐量单位.OperationTimeout设置为5分钟,我们使用默认的RetryPolicy.
我们还需要在这里调整一下吗?我们真的很关心EH的可扩展性.
谢谢
我花了几天时间测试MassTransit 3.1.2,以查看是否可以在应用程序中将它与Azure Service Bus一起使用。
我使用MassTransit.AzureServiceBus(3.1.2)通过两个控制台应用程序制作了一个示例:一个发布者和一个订阅者。
它运作良好。当我启动应用程序时,实体(队列,主题,订阅)将自动在Azure上的命名空间上创建。
当您测试事物但在生产中时,这很好,我不希望该应用程序被允许创建实体。我们想预先创建它们。
为此,我认为使用仅具有“发送”或“监听”权限的SAS策略连接到总线是一个好主意(在使用具有“管理”权限的名称空间策略之前)。
现在,我在这一点上苦苦挣扎,无法正常工作,总是遇到401错误如果我不使用具有“管理”权限的策略,则必须对此操作要求管理声明。
我尝试直接在名称空间或实体上设置策略,但没有成功。
之后,我分析了堆栈跟踪异常(用[...]省略了无用的部分):
System.UnauthorizedAccessException: Le serveur distant a retourné une erreur : (401) Non autorisé. Manage claim is required for this operation. TrackingId:2ca420e3-aac6-467c-bacb-6e051dbc3e39_G47,TimeStamp:1/29/2016 11:20:41 PM ---> System.Net.WebException: Le serveur distant a retourné une erreur : (401) Non autorisé.
à System.Net.HttpWebRequest.EndGetResponse(IAsyncResult asyncResult)
à Microsoft.ServiceBus.Messaging.ServiceBusResourceOperations.GetAsyncResult`1.<GetAsyncSteps>b__3c(GetAsyncResult`1 thisPtr, IAsyncResult r)
à Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
--- Fin de la trace de la pile d'exception interne --- …
Run Code Online (Sandbox Code Playgroud) 在MS文档之后,从订阅接收消息并不困难.但是,如果我希望我的应用程序每次发布新消息时都会收到消息 - 持续轮询.因此,SubscriptionClient类的OnMessage()方法.
MS文档说: " ......当调用OnMessage时,客户端启动内部消息泵,不断轮询队列或订阅.此消息泵包含一个发出Receive()调用的无限循环.如果调用超时,它发出下一个Receive()调用.... "
但是当应用程序运行时,调用OnMessage()方法时,只接收最新消息.发布新消息时,常量轮询似乎不起作用.在尝试了许多不同的方法后,我可以使这项工作的唯一方法是让应用程序在接收到新消息时做出反应,将代码放入具有无限循环的单独任务中.在如此多的层面上,这似乎完全错了!(见下面的代码).
任何人都可以帮我修正我的代码或发布一个工作样本来完成相同的功能而不需要循环吗?谢谢!
public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
{
var newMessage = new MessageQueue();
int i = 0;
Task listener = Task.Factory.StartNew(() =>
{
while (true)
{
SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);
Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
Client.OnMessage((message) =>
{
try
{
retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString()); …
Run Code Online (Sandbox Code Playgroud) 我有一个WebJob,只要ServiceBus队列项出现就会被触发
public static void ProcessQueueMessage(
[ServiceBusTrigger("%ServiceBusHighPriorityQueueName%")] BrokeredMessage message)
Run Code Online (Sandbox Code Playgroud)
一些网站声明如下
我假设ServiceBus队列触发器处理程序是"触发"而不是"连续",那么在设置webjob-publish-settings.json参数runMode时应该使用什么?
由于它被触发,AppService仍然需要是AlwaysOn吗?
我希望能够控制Azure功能在运行时读取的服务总线队列或订阅的名称.
使用WebJobs(Azure Functions基于),您可以通过实现和配置自定义来执行此操作INameResolver
,请参阅:如何为Web作业处理创建基于配置的队列名称?
但是,使用Azure功能,我无权连接JobHostConfiguration
此自定义解析程序.
我还可以使用INameResolver
,如果是这样的话怎么样?
azure azureservicebus azure-functions azure-functions-runtime
我有一个用Python编写的Azure函数,该函数具有Service Bus(主题)输出绑定。该功能由另一个队列触发,我们处理blobl存储中的某些文件,然后将另一个消息放入队列。
我的function.json文件如下所示:
{
"bindings": [
{
"type": "serviceBus",
"connection": "Omnibus_Input_Send_Servicebus",
"name": "outputMessage",
"queueName": "validation-output-queue",
"accessRights": "send",
"direction": "out"
}
],
"disabled": false
}
Run Code Online (Sandbox Code Playgroud)
在我的函数中,我可以像这样将消息发送到另一个队列:
with open(os.environ['outputMessage'], 'w') as output_message:
output_message.write('This is my output test message !')
Run Code Online (Sandbox Code Playgroud)
一切正常。现在,我想向主题发送消息。我使用创建了一个订阅,SQLFilter
并且需要将一些自定义属性设置为BrokeredMessage
。
从python的azure sdk中,我发现可以添加类似的自定义属性(我已经使用pip安装了azure模块):
from azure.servicebus import Message
sent_msg = Message(b'This is the third message',
broker_properties={'Label': 'M3'},
custom_properties={'Priority': 'Medium',
'Customer': 'ABC'}
)
Run Code Online (Sandbox Code Playgroud)
我的新function.json文件如下所示:
{
"bindings": [
{
"type": "serviceBus",
"connection": "Omnibus_Input_Send_Servicebus",
"name": "outputMessage",
"topicName": "validation-output-topic",
"accessRights": …
Run Code Online (Sandbox Code Playgroud) 我正在使用python语言来使用Azure总线服务队列发送和接收消息。
使用以下代码从队列中删除消息时,出现“提供的锁无效。该锁已过期,或者消息已从队列中删除”。
sbs.delete_queue_message('taskqueue',5,'ef4e2189-bfef-42ac-ba09-7fd20287f6a9')sbs.delete_queue_message('taskqueue','SequenceNumber','LockToken')
from azure.servicebus.control_client import ServiceBusService, Message, Topic, Rule, DEFAULT_RULE_NAME
key_name = '###############' # SharedAccessKeyName from Azure portal
key_value = '####################' # SharedAccessKey from Azure portal
service_namespace = '###########'
sbs = ServiceBusService(service_namespace,shared_access_key_name=key_name,shared_access_key_value=key_value)
msg = sbs.receive_queue_message('taskqueue')
sbs.delete_queue_message('taskqueue',5,'ef4e2189-bfef-42ac-ba09-7fd20287f6a9')
Run Code Online (Sandbox Code Playgroud) azureservicebus ×10
azure ×9
c# ×3
python ×2
servicebus ×2
akka ×1
akka.net ×1
azure-queues ×1
masstransit ×1