我一直在研究Azure服务总线队列(非 Azure存储队列).我读过的所有细节都表明它支持FIFO语义,但仅限于"消息传递会话"的上下文.问题是我似乎无法找到有关Azure上下文中究竟是什么的任何信息.这是一个WCF构造,还是特定于Azure Service Bus的东西?我认为它与本地交易无关,但我并非百分之百确定.
任何指针都会非常有用.谢谢!
我通过.net SDK使用Azure ServiceBus队列.OnMessageHandler/OnMessageOptions上有一个名为"AutoRenewTimeout"的标志,但似乎对这个值的实际含义感到困惑.
在这里的官方文档https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.onmessageoptions.aspx它建议AutoRenewTimeout应该大于队列锁定.
获取或设置自动续订锁定的最长持续时间.该值应大于最长的消息锁定持续时间; 例如,LockDuration属性.
这似乎表明AutoRenewTimeout或多或少是处理消息所需的最长时间.例如,如果您的锁定持续时间为1分钟,并且自动续订时间为5分钟,则该消息将在放弃之前更新总共5次并再次在队列中显示.还有其他StackOverflow答案确认这种情况,例如/sf/answers/2523598591/
要处理长消息处理,您应该设置AutoRenewTimeout == 10分钟(在您的情况下).这意味着每次LockDuration过期时,锁定将在这10分钟内更新.
因此,例如,如果您的LockDuration为3分钟且AutoRenewTimeout为10分钟,则每3分钟锁定将自动续订(3分钟,6分钟和9分钟后),并且自消息消耗后12分钟后锁定将自动释放.
然而,在更多的研究中,我偶然发现了一条旧的推文(https://twitter.com/clemensv/status/649940087267041284),看起来是微软消息传递的首席架构师.在这条推文中,似乎暗示AutoRenewTimeout是调用"RenewLock"方法的间隔.
它是在回调处于活动状态时在消息上调用https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.renewlock.aspx的时间间隔
因此,例如,如果你的锁定是1分钟,AutoRenewTimeout应该是30秒,以确保在释放之前更新消息锁定.
在我自己的测试中,我倾向于前者是正确的,但推文让我怀疑的事实可能我不知道充分利用AutoRenewTimeout
最新的sdk (Azure.Messaging.ServiceBus 7.0.1)似乎没有选项将自定义(用户)属性添加到消息(即:用于过滤主题子)。现在有人知道如何做到这一点吗?异步发送消息
我有一个围绕Azure Service Bus代码的基本包装器,我们将在一个辅助角色中使用它.这ServiceBusClient将每个工人的角色运行时进行实例化; 然后用来访问队列,直到没有剩余的项目进行枚举.
public class ServiceBusClient : IDisposable, IServiceBusClient
{
private const int DEFAULT_WAIT_TIME_IN_SECONDS = 120;
private const string SERVICE_BUS_CONNECTION_STRING_KEY = "service.bus.connection.string";
private readonly MessagingFactory _messagingFactory;
private readonly NamespaceManager _namespaceManager;
private readonly QueueClient _queueClient;
private readonly ISettingsManager _settingsManager;
public ServiceBusClient(ISettingsManager settingsManager, string queueName)
{
_settingsManager = settingsManager;
var connectionString = _settingsManager.GetSetting<string>(SERVICE_BUS_CONNECTION_STRING_KEY);
_namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
_messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);
_queueClient = GetOrCreateQueue(queueName);
}
public void Dispose()
{
_messagingFactory.Close();
}
public BrokeredMessage ReceiveTopMessage()
{
return _queueClient.Receive(TimeSpan.FromSeconds(DEFAULT_WAIT_TIME_IN_SECONDS));
}
public void SendMessage(object …Run Code Online (Sandbox Code Playgroud) 我知道可以从服务总线队列接收消息,如:
public static void ProcessQueueMessage([ServiceBusTrigger("inputqueue")] string message, TextWriter logger)
Run Code Online (Sandbox Code Playgroud)
但是还有一种通过触发属性接收会话的方法吗?像ServiceBusSessionTrigger这样的东西?
通常人们会接受这样的会话:
var session = queueClient.AcceptMessageSession();
Run Code Online (Sandbox Code Playgroud)
但我更喜欢WebJob SDK来处理一次可以处理多个会话的方式.
编辑: 似乎目前不支持此功能:请参阅github了解功能请求
c# azureservicebus azure-servicebus-queues azure-webjobs azure-webjobssdk
我正在使用Azure Service Bus主题和订阅.它用于在整个应用程序中发送控制消息.消息侦听器(订阅者)以worker角色运行,他们正在拾取消息并处理请求.即使有多个侦听器同时运行,总线中的每条消息也只能被拾取一次.
使用服务总线没有问题; 但是,我们在本地调试/测试应用程序时遇到了一些问题.我们有2个服务总线,一个用于云,一个用于本地调试.现在,如果多个人同时调试应用程序,则只有一个系统(随机)选择该消息.这是预期的行为,但它在调试时会造成麻烦.
有什么办法可以将本地仿真器用于服务总线吗?我做了一些研究,但我找不到任何可靠的解决方案.我们如何单独调试应用程序?
azure azureservicebus brokeredmessage azure-servicebus-topics
我正在尝试将Azure Service Bus与.NET Core一起使用.显然此刻,这种情绪很糟糕.我尝试了以下路线:
然后,我转向REST.
https://azure.microsoft.com/en-gb/documentation/articles/service-bus-brokered-tutorial-rest/是一个很好的开始(虽然没有对.NET Core的RestSharp支持,并且出于某种原因,官方SDK似乎没有涵盖REST客户端 - 没有Swagger def,没有AutoRest客户端等).虽然这个蹩脚的示例将字符串连接到XML而不进行编码,但它涵盖了一小部分功能.
所以我决定寻找REST文档.有两个部分,"经典"REST和REST.普通的新REST不支持实际发送和接收消息(......嗯?).我厌恶使用标有"经典"的旧技术,除非我能理解它是什么 - 当然,文档在这里没有帮助.它还使用XML和ATOM而不是JSON.我不知道为什么.
额外奖励:REST API文档中链接的示例,例如来自https://msdn.microsoft.com/en-US/library/azure/hh780786.aspx,不再存在.
是否有任何可行的方法,任何人都可以使用Azure Service Bus和.NET Core读取/写入主题/订阅消息?
我想测试我使用 QueueClient 注册的消息处理程序回调queueClient.RegisterMessageHandler(MyCallBack, messageHandlerOptions)。
public Task MyCallBack(Message msg, CancellationToken token)
{
// Read msg. Do something
// Since everything ran fine, complete the msg.
await _client.CompleteAsync(msg.SystemProperties.LockToken);
}
Run Code Online (Sandbox Code Playgroud)
现在作为我的单元测试的一部分,我调用MyCallBack. 因为我传递了一个有效的消息。我期待client.CompleteAsync()被召唤。但是测试会抛出异常。
System.InvalidOperationException: Operation is not valid due to the current state of the object.
Run Code Online (Sandbox Code Playgroud)
这是因为,msg.SystemProperties.LockToken是未设置(这是因为消息没有被实际上从一个队列中与客户端读取ReceiveMode.PeekLock模式)。
有没有办法设置/模拟它,以便我可以使用虚拟字符串作为令牌运行我的测试?
PS:我知道我可以msg.SystemProperties.IsLockTokenSet在实际访问 LockToken 字段之前检查;但即使在那种情况下,如果_client.CompleteAsync()被调用,我也永远无法进行单元测试。
我正在编写一个 Azure 函数来获取 Azure 服务总线中的消息。我想手动处理任何异常("autoCompleteMessages": false)无法弄清楚如何将完整或放弃发送回服务队列。
尝试过选项1:
[FunctionName("SBQ_F1_VC")]
public static async Task Run([ServiceBusTrigger("sbqfn1", Connection = "BrnlTest1_SERVICEBUS")]
ServiceBusReceivedMessage msg, ILogger log)
{
//.....
if(!Int32.TryParse(msg.ApplicationProperties.GetValueOrDefault("vid").ToString(), out vid))
{ await using ServiceBusClient client = new ServiceBusClient(Environment.GetEnvironmentVariable("BrnlTest1_SERVICEBUS"));
ServiceBusReceiver msgRcvr = client.CreateReceiver(Environment.GetEnvironmentVariable("queueName"), new ServiceBusReceiverOptions());
//await msgRcvr.RenewMessageLockAsync(msg);
await msgRcvr.AbandonMessageAsync(msg); //vid = 0;
}
//.....
}
Run Code Online (Sandbox Code Playgroud)
错误选项 1
System.Private.CoreLib: Exception while executing function: SBQ_F1_VC. Azure.Messaging.ServiceBus: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or …Run Code Online (Sandbox Code Playgroud) QueueClient.OnMessage方法是否总是在不同的线程上执行回调参数?
我假设如果MaxConcurrentCalls设置为10,那么queueClient将最多启动10个线程来并行处理消息.如果传入MaxConcurrentConnection值为1或者它是否在当前线程上执行,是否会创建新线程?
我的实际问题:
在一个Worker角色中我想处理多个队列,但是它们都会同时处理.例如
_queueClient1.OnMessage(x =>
{
// Do something
}, new OnMessageOptions { MaxConcurrentCalls = 1});
_queueClient2.OnMessage(x =>
{
// Do something
}, new OnMessageOptions { MaxConcurrentCalls = 1 });
_queueClient3.OnMessage(x =>
{
// Do something
}, new OnMessageOptions { MaxConcurrentCalls = 1 });
_queueClient4.OnMessage(x =>
{
// Do something
}, new OnMessageOptions { MaxConcurrentCalls = 1 });
Run Code Online (Sandbox Code Playgroud)
这是否会导致每个回调并行执行,以便_queueClient4回调不等待_queueClient2完成才能执行?