标签: azureservicebus

Azure Service Bus和消息传递会话

我一直在研究Azure服务总线队列( Azure存储队列).我读过的所有细节都表明它支持FIFO语义,但仅限于"消息传递会话"的上下文.问题是我似乎无法找到有关Azure上下文中究竟是什么的任何信息.这是一个WCF构造,还是特定于Azure Service Bus的东西?我认为它与本地交易无关,但我并非百分之百确定.

任何指针都会非常有用.谢谢!

azure azure-queues azureservicebus

12
推荐指数
1
解决办法
1万
查看次数

Azure ServiceBus AutoRenewTimeout

我通过.net SDK使用Azure ServiceBus队列.OnMessageHandler/OnMessageOptions上有一个名为"AutoRenewTimeout"的标志,但似乎对这个值的实际含义感到困惑.

在这里的官方文档https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.onmessageoptions.aspx它建议AutoRenewTimeout应该大于队列锁定.

获取或设置自动续订锁定的最长持续时间.该值应大于最长的消息锁定持续时间; 例如,LockDuration属性.

这似乎表明AutoRenewTimeout或多或少是处理消息所需的最长时间.例如,如果您的锁定持续时间为1分钟,并且自动续订时间为5分钟,则该消息将在放弃之前更新总共5次并再次在队列中显示.还有其他StackOverflow答案确认这种情况,例如/sf/answers/2523598591/

要处理长消息处理,您应该设置AutoRenewTimeout == 10分钟(在您的情况下).这意味着每次LockDuration过期时,锁定将在这10分钟内更新.

因此,例如,如果您的LockDuration为3分钟且AutoRenewTimeout为10分钟,则每3分钟锁定将自动续订(3分钟,6分钟和9分钟后),并且自消息消耗后12分钟后锁定将自动释放.

然而,在更多的研究中,我偶然发现了一条旧的推文(https://twitter.com/clemensv/status/649940087267041284),看起来是微软消息传递的首席架构师.在这条推文中,似乎暗示AutoRenewTimeout是调用"RenewLock"方法的间隔.

它是在回调处于活动状态时在消息上调用https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.renewlock.aspx的时间间隔

因此,例如,如果你的锁定是1分钟,AutoRenewTimeout应该是30秒,以确保在释放之前更新消息锁定.

在我自己的测试中,我倾向于前者是正确的,但推文让我怀疑的事实可能我不知道充分利用AutoRenewTimeout

c# azure azureservicebus

12
推荐指数
1
解决办法
3711
查看次数

如何使用 Azure.Messaging.ServiceBus 库添加自定义属性?

最新的sdk (Azure.Messaging.ServiceBus 7.0.1)似乎没有选项将自定义(用户)属性添加到消息(即:用于过滤主题子)。现在有人知道如何做到这一点吗?异步发送消息

c# azure azureservicebus

12
推荐指数
2
解决办法
1万
查看次数

Azure Service Bus客户端连接持久性

我有一个围绕Azure Service Bus代码的基本包装器,我们将在一个辅助角色中使用它.这ServiceBusClient将每个工人的角色运行时进行实例化; 然后用来访问队列,直到没有剩余的项目进行枚举.

public class ServiceBusClient : IDisposable, IServiceBusClient
{
    private const int DEFAULT_WAIT_TIME_IN_SECONDS = 120;

    private const string SERVICE_BUS_CONNECTION_STRING_KEY = "service.bus.connection.string";

    private readonly MessagingFactory _messagingFactory;

    private readonly NamespaceManager _namespaceManager;

    private readonly QueueClient _queueClient;

    private readonly ISettingsManager _settingsManager;

    public ServiceBusClient(ISettingsManager settingsManager, string queueName)
    {
        _settingsManager = settingsManager;

        var connectionString = _settingsManager.GetSetting<string>(SERVICE_BUS_CONNECTION_STRING_KEY);

        _namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
        _messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);

        _queueClient = GetOrCreateQueue(queueName);
    }

    public void Dispose()
    {
        _messagingFactory.Close();
    }

    public BrokeredMessage ReceiveTopMessage()
    {
        return _queueClient.Receive(TimeSpan.FromSeconds(DEFAULT_WAIT_TIME_IN_SECONDS));
    }

    public void SendMessage(object …
Run Code Online (Sandbox Code Playgroud)

c# client azure azureservicebus

11
推荐指数
1
解决办法
7712
查看次数

用于会话的Azure WebJob ServiceBusTrigger

我知道可以从服务总线队列接收消息,如:

public static void ProcessQueueMessage([ServiceBusTrigger("inputqueue")] string message, TextWriter logger)
Run Code Online (Sandbox Code Playgroud)

但是还有一种通过触发属性接收会话的方法吗?像ServiceBusSessionTrigger这样的东西?

通常人们会接受这样的会话:

var session = queueClient.AcceptMessageSession();
Run Code Online (Sandbox Code Playgroud)

但我更喜欢WebJob SDK来处理一次可以处理多个会话的方式.

编辑: 似乎目前不支持此功能:请参阅github了解功能请求

c# azureservicebus azure-servicebus-queues azure-webjobs azure-webjobssdk

11
推荐指数
1
解决办法
1498
查看次数

在本地使用Azure Service Bus

我正在使用Azure Service Bus主题和订阅.它用于在整个应用程序中发送控制消息.消息侦听器(订阅者)以worker角色运行,他们正在拾取消息并处理请求.即使有多个侦听器同时运行,总线中的每条消息也只能被拾取一次.

使用服务总线没有问题; 但是,我们在本地调试/测试应用程序时遇到了一些问题.我们有2个服务总线,一个用于云,一个用于本地调试.现在,如果多个人同时调试应用程序,则只有一个系统(随机)选择该消息.这是预期的行为,但它在调试时会造成麻烦.

有什么办法可以将本地仿真器用于服务总线吗?我做了一些研究,但我找不到任何可靠的解决方案.我们如何单独调试应用程序?

azure azureservicebus brokeredmessage azure-servicebus-topics

11
推荐指数
2
解决办法
4615
查看次数

Azure Service Bus - 主题,消息 - 使用.NET Core

我正在尝试将Azure Service Bus与.NET Core一起使用.显然此刻,这种情绪很糟糕.我尝试了以下路线:

  1. 官方SDK:不适用于.NET Core
  2. AMQP.Net Lite:没有(体面的)文档,没有关于创建/列出主题的管理API等.只有Service Bus示例涵盖了一小部分功能,需要您已经有一个主题,等等
  3. 围绕Azure SDK(https://github.com/ppatierno/azuresblite)的AMQP.Net Lite的社区包装:不适用于.NET Core

然后,我转向REST.

https://azure.microsoft.com/en-gb/documentation/articles/service-bus-brokered-tutorial-rest/是一个很好的开始(虽然没有对.NET Core的RestSharp支持,并且出于某种原因,官方SDK似乎没有涵盖REST客户端 - 没有Swagger def,没有AutoRest客户端等).虽然这个蹩脚的示例将字符串连接到XML而不进行编码,但它涵盖了一小部分功能.

所以我决定寻找REST文档.有两个部分,"经典"REST和REST.普通的新REST不支持实际发送和接收消息(......嗯?).我厌恶使用标有"经典"的旧技术,除非我能理解它是什么 - 当然,文档在这里没有帮助.它还使用XML和ATOM而不是JSON.我不知道为什么.

额外奖励:REST API文档中链接的示例,例如来自https://msdn.microsoft.com/en-US/library/azure/hh780786.aspx,不再存在.

是否有任何可行的方法,任何人都可以使用Azure Service Bus和.NET Core读取/写入主题/订阅消息?

azure azureservicebus .net-core

11
推荐指数
1
解决办法
4024
查看次数

单元测试 ServiceBus.Message。如何设置 SystemProperties.LockToken 的值?

我想测试我使用 QueueClient 注册的消息处理程序回调queueClient.RegisterMessageHandler(MyCallBack, messageHandlerOptions)

public Task MyCallBack(Message msg, CancellationToken token)
{
   // Read msg. Do something

   // Since everything ran fine, complete the msg.
   await _client.CompleteAsync(msg.SystemProperties.LockToken);
}
Run Code Online (Sandbox Code Playgroud)

现在作为我的单元测试的一部分,我调用MyCallBack. 因为我传递了一个有效的消息。我期待client.CompleteAsync()被召唤。但是测试会抛出异常。

System.InvalidOperationException: Operation is not valid due to the current state of the object.
Run Code Online (Sandbox Code Playgroud)

这是因为,msg.SystemProperties.LockToken设置(这是因为消息没有被实际上从一个队列中与客户端读取ReceiveMode.PeekLock模式)。

有没有办法设置/模拟它,以便我可以使用虚拟字符串作为令牌运行我的测试?


PS:我知道我可以msg.SystemProperties.IsLockTokenSet在实际访问 LockToken 字段之前检查;但即使在那种情况下,如果_client.CompleteAsync()被调用,我也永远无法进行单元测试。

c# azureservicebus azure-servicebus-queues

11
推荐指数
2
解决办法
4238
查看次数

如何使用 azure 服务总线 5.0.0 在 C# azure 函数中手动处理消息完成

我正在编写一个 Azure 函数来获取 Azure 服务总线中的消息。我想手动处理任何异常("autoCompleteMessages": false)无法弄清楚如何将完整或放弃发送回服务队列。

尝试过选项1:

[FunctionName("SBQ_F1_VC")]
public static async Task Run([ServiceBusTrigger("sbqfn1", Connection = "BrnlTest1_SERVICEBUS")]
    ServiceBusReceivedMessage msg, ILogger log)
{
//.....

    if(!Int32.TryParse(msg.ApplicationProperties.GetValueOrDefault("vid").ToString(), out vid))
    {   await using ServiceBusClient client = new ServiceBusClient(Environment.GetEnvironmentVariable("BrnlTest1_SERVICEBUS"));
        ServiceBusReceiver msgRcvr = client.CreateReceiver(Environment.GetEnvironmentVariable("queueName"), new ServiceBusReceiverOptions()); 
        //await msgRcvr.RenewMessageLockAsync(msg);
        await msgRcvr.AbandonMessageAsync(msg);  //vid = 0;
    }

//.....
}
Run Code Online (Sandbox Code Playgroud)

错误选项 1

System.Private.CoreLib: Exception while executing function: SBQ_F1_VC. Azure.Messaging.ServiceBus: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or …
Run Code Online (Sandbox Code Playgroud)

c# azure azureservicebus azure-functions

11
推荐指数
1
解决办法
8036
查看次数

Azure ServiceBus QueueClient.OnMessage是否在其他线程上执行

QueueClient.OnMessage方法是否总是在不同的线程上执行回调参数?

我假设如果MaxConcurrentCalls设置为10,那么queueClient将最多启动10个线程来并行处理消息.如果传入MaxConcurrentConnection值为1或者它是否在当前线程上执行,是否会创建新线程?

我的实际问题:
在一个Worker角色中我想处理多个队列,但是它们都会同时处理.例如

        _queueClient1.OnMessage(x =>
            {
                // Do something
            }, new OnMessageOptions { MaxConcurrentCalls = 1});

        _queueClient2.OnMessage(x =>
        {
            // Do something
        }, new OnMessageOptions { MaxConcurrentCalls = 1 });

        _queueClient3.OnMessage(x =>
        {
            // Do something
        }, new OnMessageOptions { MaxConcurrentCalls = 1 });

        _queueClient4.OnMessage(x =>
        {
            // Do something
        }, new OnMessageOptions { MaxConcurrentCalls = 1 });
Run Code Online (Sandbox Code Playgroud)

这是否会导致每个回调并行执行,以便_queueClient4回调不等待_queueClient2完成才能执行?

azure azureservicebus

10
推荐指数
1
解决办法
6451
查看次数