使用Apache Camel请求回复和分散聚集

tec*_*hon 4 apache-camel

我正在尝试构建一条将执行以下操作的路线:

  1. 消费来自 jms:sender-in.我正在使用INOUT请求回复模式.该JMSReplyTo = sender-out
  2. 上述消息将被路由到多个接收者像jms:consumer1-in,jms:consumer2-injms:consumer3-in.所有人都在使用请求回复模式.在 JMSReplyTo每消费指定(在这种情况下,JMSReplyTo在这个顺序jms:consumer1-out, jms:consumer2-out, jms:consumer3-out
  3. 我需要将所有回复聚合在一起并将结果发送回去jms:sender-out.

我构建了一条类似于此的路线:

from("jms:sender-in")
    .to("jms:consumer1-in?exchangePattern=InOut&replyTo=queue:consumer1-out&preserveMessageQos=true")
    .to("jms:consumer2-in?exchangePattern=InOut&replyTo=queue:consumer2-out&preserveMessageQos=true")
    .to("jms:consumer3-in?exchangePattern=InOut&replyTo=queue:consumer3-out&preserveMessageQos=true");
Run Code Online (Sandbox Code Playgroud)

然后我将回复发送回某个队列以收集和聚合:

from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
from("jms:consumer1-out?preserveMessageQos=true").to("jms:gather");
from("jms:gather").aggregate(header("TransactionID"), new GatherResponses()).completionSize(3).to("jms:sender-out");
Run Code Online (Sandbox Code Playgroud)

为了模仿我的消费者的行为,我添加了以下路线:

from("jms:consumer1-in").setBody(body());
from("jms:consumer2-in").setBody(body());
from("jms:consumer3-in").setBody(body());
Run Code Online (Sandbox Code Playgroud)

我有几个问题:

  1. 我在回复中收到超时错误.如果我评论聚集部分,那么没有问题.即使回复返回队列然后转发到另一个队列,为什么会有超时.
  2. 如何存储原始JMSReplyTo值,以便Camel能够将聚合结果发送回发送方的回复队列.

我有一种感觉,我正在努力解决一些基本概念.任何帮助表示赞赏.谢谢.

Pet*_*der 9

一个好问题!

您需要考虑两件事

  1. 不要混合交换模式,请求回复(InOut)与事件消息(InOnly).(除非你有充分的理由).
  2. 如果你进行分散 - 聚集,则需要对请求进行 多播,否则它们将被流水线化,而不是真正的分散 - 聚集.

我做了两个与你的情况类似的例子 - 一个有Request Reply,另一个有(单向)事件消息.

随意用jms替换activemq组件 - 在这些示例中它是相同的.

示例一,使用事件消息 - InOnly:

from("activemq:amq.in")
    .multicast()
        .to("activemq:amq.q1")
        .to("activemq:amq.q2")
        .to("activemq:amq.q3");

from("activemq:amq.q1").setBody(constant("q1")).to("activemq:amq.gather");
from("activemq:amq.q2").setBody(constant("q2")).to("activemq:amq.gather");
from("activemq:amq.q3").setBody(constant("q3")).to("activemq:amq.gather");

from("activemq:amq.gather")
    .aggregate(new ConcatAggregationStrategy())
        .header("breadcrumbId")
        .completionSize(3)
        .to("activemq:amq.out");

from("activemq:amq.out")
    .log("${body}"); // logs "q1q2q3"
Run Code Online (Sandbox Code Playgroud)

示例二,使用Request reply - 请注意散射路径必须在响应时收集响应.结果与第一个示例相同,但路由较少且配置较少.

from("activemq:amq.in2")
    .multicast(new ConcatAggregationStrategy())
        .inOut("activemq:amq.q4")
        .inOut("activemq:amq.q5")
        .inOut("activemq:amq.q6")
    .end()
    .log("Received replies: ${body}"); // logs "q4q5q6"

from("activemq:amq.q4").setBody(constant("q4"));
from("activemq:amq.q5").setBody(constant("q5"));
from("activemq:amq.q6").setBody(constant("q6"));
Run Code Online (Sandbox Code Playgroud)

至于你的问题二 - 当然,可以传递JMSReplyTo标题并强制交换模式 - 但是你将创建难以调试的代码.保持您的交换模式简单和干净 - 它可以防止错误.