使用带有activemq的camel"暂停"路由的正确方法是什么?

Den*_*ise 5 java activemq-classic apache-camel

我的基本问题是我只能在任何1小时的时间内处理来自我的一个队列(所有机器)的7000条消息.我没有看到用camel或activemq做这个的方法,所以我采用了自己的路由停止/启动逻辑.我看到了很多方法,我尝试了一些方法(只是遇到问题).

  1. camelContext.stopRoute(route):这可以使消息停止处理,但是当我调用时camelContext.startRoute(route),它会泄漏tcp连接,最终导致activemq服务器达到其极限并死亡.
  2. camelContext.suspendRoute(route):这也会阻止消息被处理并且不会泄漏连接,但它似乎会终止在我调用时不会重新激活的活动消费者(在管理面板中可见)camelContext.resumeRoute(route).我认为,即使我恢复,最终也可能导致根本没有消息从该队列中被处理掉.
  3. 实现自定义RoutePolicy.公平地说,我还没有尝试过,但似乎它会成为我根据上面选择的暂停方法遇到的同样问题的牺牲品.

有没有解决这个问题的方法,我还没有遇到过?

Pet*_*ler 5

我建议使用Throttler EIP ,而不是停止路由。

from("jms:queue:inbox")
    .throttle(7000)
    .timePeriodMillis(1000*60*60)
    .to("log:result", "mock:result");
Run Code Online (Sandbox Code Playgroud)

上面的示例将在jms:queue:inbox发送之前限制接收的消息,以mock:result确保在任何 1 小时窗口内最多发送 7000 条消息。

或者,为了更细粒度的控制,您可以定义限制路由策略,如 Camel 的限制示例所示:

<route routePolicyRef="myPolicy">
    <from uri="jms:queue:inbox"/>
    <transacted/>
    <to uri="log:+++JMS +++?groupSize=100"/>
    <to ref="foo"/>
</route>
Run Code Online (Sandbox Code Playgroud)

节流警察定义如下:

<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
    <property name="scope" value="Context"/>
    <!-- when we hit > 20 inflight exchanges then kick in and suspend the routes -->
    <property name="maxInflightExchanges" value="20"/>
    <!-- when we hit lower than 10% of the max = 2 then kick in and resume the routes the default percentage is 70% but in this demo we want a low value -->
    <property name="resumePercentOfMax" value="10"/>
    <!-- output throttling activity at WARN level -->
    <property name="loggingLevel" value="WARN"/>
</bean>
Run Code Online (Sandbox Code Playgroud)

编辑1:

如果您需要全局限制,那么您可以首先让一个消费者读取消息,如上所述限制所有消息,然后将它们重新发送到另一个队列,并让>= 1 个分布式消费者重新读取和处理它们。

编辑2:

或者,您可以实现自己的ThrottlingInflightRoutePolicy访问保存处理信息的中央数据库。这样,您就不需要“单节点主节流器”。然而,数据库也可能是单点故障。