在重试邮件之前,Azure Service Bus是否可以延迟?

Pau*_*ner 23 azure azureservicebus

Azure Service Bus支持内置的重试机制,该机制使被放弃的消息立即可见以进行另一次读取尝试.我正在尝试使用此机制来处理一些瞬态错误,但消息在被放弃后立即可用.

我想做的是让信息在被放弃后的一段时间内不可见,最好是基于指数递增的策略.

ScheduledEnqueueTimeUtc在放弃消息时尝试设置属性,但似乎没有效果:

var messagingFactory = MessagingFactory.CreateFromConnectionString(...);

var receiver = messagingFactory.CreateMessageReceiver("test-queue");

receiver.OnMessageAsync(async brokeredMessage =>
{
    await brokeredMessage.AbandonAsync(
        new Dictionary<string, object>
        {
            { "ScheduledEnqueueTimeUtc", DateTime.UtcNow.AddSeconds(30) }
        });
    }
});
Run Code Online (Sandbox Code Playgroud)

我已经考虑过根本不放弃消息而只是让锁过期,但是这需要一些方法来影响MessageReceiver指定消息的锁定持续时间的方式,而我在API中找不到任何东西让我改变这个价值.此外,在已经需要锁定之前,将无法读取消息的传递计数(因此决定等待下一次重试的时间).

消息总线中的重试策略是否可以以某种方式受到影响,还是可以通过其他方式人为引入延迟?

Mik*_*eWo 14

这里要小心,因为我认为您将重试功能与事件驱动的消息处理的自动Complete/ Abandon机制混淆OnMessage.当对服务总线的调用失败时,内置重试机制开始起作用.例如,如果您调用将消息设置为完成且失败,则重试机制将启动.如果您正在处理消息,则您自己的代码中会发生异常,不会通过重试功能触发重试.如果错误来自您的代码或尝试联系服务总线,您的问题不会明确.

如果确实在修改了尝试与服务总线通信时发生错误时发生的重试策略,则可以修改RetryPolicy在其MessageReciver自身上设置的重试策略.RetryExponitial默认情况下会使用一个,以及RetryPolicy您可以创建自己的摘要.

我认为你所追求的是更多地控制当你处理异常时会发生什么,并且你想要推迟处理该消息.有几个选择:

创建消息处理程序时,可以设置OnMessageOptions.其中一个属性是"自动完成".默认情况下,此参数设置为true,这意味着一旦完成消息处理,就会Complete自动调用该方法.如果发生异常,则会自动调用放弃,这就是您所看到的.通过将AutoComplete设置为false,您需要在消息处理程序中自行调用Complete.如果不这样做将导致消息锁定最终耗尽,这是您正在寻找的行为之一.

因此,您可以编写处理程序,以便在处理过程中发生异常时,您根本不会调用Complete.然后该消息将保留在队列中,直到它的锁定用完为止,然后再次可用.标准的死亡刻字机制适用,在x次尝试后,它将自动放入死信队列.

处理这种方式的一个注意事项是任何类型的异常都将以这种方式处理.你真的需要考虑什么类型的异常正在做这个以及你是否真的想要推迟处理.例如,如果您在处理期间呼叫第三方系统并且它给您一个例外,您知道这是一个短暂的,很棒的.但是,如果它给你一个错误,你知道这将是一个大问题,那么你可能决定在系统中做一些其他事情,除了对消息哄骗.

您还可以查看" Defer"方法.实际上,该方法不允许从队列中处理该消息,除非它被序列号特别拉出.您的代码必须记住序列号值并将其拉出.这不是你所描述的.

另一个选择是你可以放弃OnMessage,事件驱动的处理消息的方式.虽然这非常有用,但您无法对事物进行大量控制.而是连接自己的处理循环并自己处理放弃/完成.您还需要处理OnMessage模式为您提供的一些线程/并发调用管理.这可能是更多的工作,但你有最大的灵活性.

最后,我认为您AbandonAsync传递要修改的属性的调用不起作用的原因是这些属性是指方法上的元数据属性,而不是BrokeredMessage上的标准属性.


Dre*_*rsh 11

我实际上去年问了同样的问题(实现不谈)我用三种方法来考虑API.在SB团队工作的@ClemensVasters回应说,使用Defer某种重新接收实际上是精确控制这种情况的唯一方法.

您可以阅读我对其答案的回答,以了解具体方法,我建议使用辅助队列来存储指示哪些主要消息已被延迟并需要从主队列重新接收的消息.然后,您可以通过设置ScheduledEnqueueTimeUtc这些辅助消息来控制等待多长时间,以便在重试之前准确控制等待的时间.

  • 我认为德鲁的方法会奏效。原始消息永远不会从主队列中删除。它被标记为延迟,这意味着除非使用序列号来检索它,否则它不会被取回。辅助队列消息仅包含序列号。请记住,带有窥视锁的代理消息传递为您提供“至少一次”处理,而不是“恰好一次”处理。如果您只需要一次,则必须自己处理。 (2认同)
  • 如果要引入辅助队列,那么延迟消息有什么意义呢?只需使用“ScheduledEnqueueTimeUtc”将相同的消息重新排队到同一队列即可 (2认同)
  • 如何重新排队完全相同的消息,但仅修改“ScheduledEnqueueTimeUtc”?收到的消息的类型为“ServiceBusReceivedMessage”,仅具有这些属性的 getter。如果创建新的“ServiceBusMessage(ServiceBusReceivedMessage)”,它将重置传递计数。 (2认同)