使用 JMS 安排 Azure 服务总线的消息

Mat*_*tti 3 java jms azure qpid azureservicebus

我想使用 JMS 将计划消息发送到 Azure 服务总线。我的代码是基于org.apache.qpid.jms.message.JmsMessage. 我找到了针对给定问题的一种解决方案,但它使用org.apache.qpid.proton.message.Messagewhich has .getMessageAnnotations(),它允许编辑消息注释并添加一些由 Azure 服务总线正确识别和处理的属性。我的消息 impl 缺少该方法。

我在官方文档和node.js 中的实现中发现,要使用Azure 服务总线安排消息,您需要发送BrokerProperties/brokerProperties具有有效json 的标头。其他标头/属性将被标记为Customer properties并被 Azure 服务总线忽略。

关于 JMS 的官方 azure 文档ScheduledEnqueueTimeUtc表示JMS API 不正式支持该设置。但可以通过手动设置属性来实现。

因此,当我将消息发送到队列时,我可以在 lambda 中对其进行后期处理并设置一些属性:

jmsTemplate.convertAndSend(queue, payload, message -> {
    var date = Date.from(ZonedDateTime.now(ZoneId.of("UTC")).plus(delay, ChronoUnit.MILLIS).toInstant());
    var brokerProps = Map.of("ScheduledEnqueueTimeUtc", date.toGMTString());
    message.setStringProperty(
        "brokerProperties",
        objectMapper.writeValueAsString(brokerProps)
    );
    return message;
});
Run Code Online (Sandbox Code Playgroud)

但这不起作用。消息到达队列,但当我尝试在 Azure 上的服务总线资源管理器上查看它时,它会在浏览器控制台中抛出错误,并且该操作将永远持续。我想设置该属性brokerProperties会对服务总线产生一些影响。我还尝试发送带有日期作为字符串的地图(带有Azure使用的日期格式)"ScheduledEnqueueTimeUtc", "Thu, 25 Mar 2021 12:54:00 GMT",但它也被服务总线识别为错误(窥视永远持续,并在浏览器控制台中引发错误) 。

我尝试设置类似x-opt-scheduled-enqueue-timex-ms-scheduled-enqueue-time在其他线程中找到的字符串属性,但它们都不适合我的示例。

我看到 Microsoft 提供了一些用于 Java 与 Azure 服务总线通信的库,但我需要在代码中保持与云提供商的独立性,并且不包含任何其他库。

是否有使用包中的 JMS 消息实现为 Azure 服务总线org.apache.qpid.jms.message.JmsMessage设置的示例?BrokerProperties

Rap*_*ler 5

我的团队目前面临着同样的问题。

我们发现ScheduledEnqueueTimeUtc属性是在MessageAnnotationsMap中设置的。不幸的是,org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacadeJMS 使用的 已将 getter 和 setter 设置为包私有。但我们发现您可以使用该setTracingAnnotation(String key, Object value)方法。

例子:

public void sendDelayedMessage() {
    final var now = ZonedDateTime.now();
    jmsTemplate.send("test-queue", session -> {
        final var tenMinutesFromNow = now.plusMinutes(10);
        final var textMessage = session.createTextMessage("Hello Service Bus!");
        ((JmsTextMessage) textMessage).getFacade().setTracingAnnotation("x-opt-scheduled-enqueue-time", Date.from(tenMinutesFromNow.toInstant()));
        return textMessage;
    });
    log.info("Sent at: " + now);
}
Run Code Online (Sandbox Code Playgroud)

证明: 在此输入图像描述

非常感谢我的队友!!