如何延迟 Azure 服务总线消息?

Nin*_*oel 6 c# azureservicebus

当前我正在使用Microsoft.Azure.ServiceBus.IQueueClientto RegisterMessageHandler,然后我收到的消息是类型Microsoft.Azure.ServiceBus.Message

根据文档

消息延迟 API API 在 .NET Framework 客户端中是 BrokeredMessage.Defer 或 BrokeredMessage.DeferAsync,在 .NET Standard 客户端中是 MessageReceiver.DeferAsync,在 Java 客户端中是 IMessageReceiver.defer 或 IMessageReceiver.deferAsync。

...但这些库中没有一个与我实际使用的类相关。我如何推迟?为了能够延迟消息,我必须使用哪些类和东西?上面的所有示例都没有提供足够的代码片段来解释它。

根据@Gaurav 的要求进行更新

从您的回答中,我可以看到我的消息具有该属性:

message.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddHours(1);
Run Code Online (Sandbox Code Playgroud)

queueClient也有这个方法:

queueClient.ScheduleMessageAsync(message, DateTime.UtcNow.AddHours(1));
Run Code Online (Sandbox Code Playgroud)

我将尝试“ scheduledMessageAsync”,因为我无法看到如何在ScheduledEnqueueTimeUtc不调用queueClient

Gau*_*tri 6

Microsoft.Azure.ServiceBus.Message有一个名为ScheduledEnqueueTimeUtc. 以后当您希望消息出现在队列中时,只需将此属性的值设置为日期/时间值即可。消息将隐藏到那个时间,并且只会在该日期/时间出现在队列中。

更新

所以我跑了测试,并确认双方ScheduledEnqueueTimeUtcScheduleMessageAsync作品。我使用4.1.1Microsoft.Azure.ServiceBusSDK版本。

这是我写的代码:

    static void Main(string[] args)
    {
        var connectionString = "my-connection-string";
        var queueName = "test";
        QueueClient queueClient = new QueueClient(connectionString, queueName);
        Message msg1 = new Message()
        {
            Body = Encoding.UTF8.GetBytes("This message has ScheduledEnqueueTimeUtc property set. It will appear in queue after 2 minutes. Current date/time is: " + DateTime.Now),
            ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(2)
        };
        queueClient.SendAsync(msg1).GetAwaiter().GetResult();
        Message msg2 = new Message()
        {
            Body = Encoding.UTF8.GetBytes("This message is sent via ScheduleMessageAsync method. It will appear in queue after 2 minutes. Current date/time is: " + DateTime.Now)
        };
        queueClient.ScheduleMessageAsync(msg2, new DateTimeOffset(DateTime.UtcNow.AddMinutes(2))).GetAwaiter().GetResult();
        Console.ReadLine();
    }
Run Code Online (Sandbox Code Playgroud)

这就是我在Peek-Lock模式下获取消息时看到的:

在此处输入图片说明


Nin*_*oel 1

我已经编写了我正在寻找的解决方案,这是基本轮廓:

在异步方法内部(运行自己的线程)

public async Task InitialiseAndRunMessageReceiver()
Run Code Online (Sandbox Code Playgroud)

启动读取消息的无限循环

receiver = new MessageReceiver(serviceBusConnectionString, serviceBusQueueName, ReceiveMode.PeekLock); 
while (true) { var message = await receiver.ReceiveAsync(); ... more code... }
Run Code Online (Sandbox Code Playgroud)

一旦您知道您即将开始长期任务,请推迟消息,但存储message.SystemProperties.SequenceNumber. 这会将其保留在队列中,但会阻止其重新传送。

await receiver.DeferAsync(message.SystemProperties.LockToken);
Run Code Online (Sandbox Code Playgroud)

当您最终完成后,再次使用 请求消息message.SystemProperties.SequenceNumber,并完成消息,就好像它没有被推迟一样

var message = receiver.ReceiveDeferredMessageAsync(message.SystemProperties.SequenceNumber);
receiver.CompleteAsync(message.Result.SystemProperties.LockToken);
Run Code Online (Sandbox Code Playgroud)

并且您的消息将从队列中删除。

我的大部分困惑是由于图书馆的名称相似且寿命重叠而引起的。

Microsoft.Azure.ServiceBus.Core.MessageReceiver就是上面的消息接收者