Łuk*_*man 8 java activemq-classic jms cxf mule
这个问题
如何配置ActiveMQ和<flow>进入Mule ESB 3.2,以确保从队列中提取的消息最终由外部正确处理CXF service?
脚本
我有一个CXF端点,它应该接收传入消息并尽快将其传输到三个外部服务.我们称它们为EX1,EX2,EX3.由于<all>Mule 3.x中引入了组件,这非常简单.
整个解决方案最重要的要求是确保每个收到的消息最终都交付给所有三个CXF服务.所以我们最终得出了这个想法,将每个传入的消息放入Persistent JMS queues(Q1,Q2,Q3).在从队列Qn读取消息之后,它直接转移到相应的EXn端点,因此 - 外部服务.
配置
(我可以根据要求提供完整的配置)
我们已经按照此处的描述配置了ActiveMQ代理,并将其与我们的<flow>配置连接起来.一切似乎按预期工作,我有JConsole连接到我的应用程序,所以我可以看到消息是PERSISTENT类型,他们最终排队等候.如果一切顺利 - 所有三个服务EXn都会收到消息.
测试
当我们关闭其中一个服务时,问题就出现了,让我们说EX2,并重新启动整个服务器模拟失败.消息最终会丢失(我想这不是那么持久,是吧?).最奇怪的是 - 如果我们在EX2关闭时发送了10条消息,那么在服务器重启后,其中9条正在被重新传送!所以我想也许,也许,这10条消息中有9条已经正确入队,而当服务器发生故障时,这条消息中的9条经常被重新传送.
这让我觉得,CXF端点没有处理事务支持,说实话我无法理解.毕竟,当我尝试重新传递时,我可以看到消息在队列中,所以它应该被保留.这显然不是,但为什么呢?
我自己的尝试 我已经尝试过很多东西,但都没有.总是有一条消息丢失.
<jms:transaction />在流程中使用任何标签 - 不起作用<cxf:jaxws-client /><xa-transaction />- 并没有用<default-exception-strategy>配置 - 如果我记得它让事情变得更糟任何帮助表示赞赏,谢谢.
ACTIVE MQ CONFIGURATION
<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
<spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&broker.persistent=true&broker.useJmx=true"/>
<spring:property name="redeliveryPolicy">
<spring:bean class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
<spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
<spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
<spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
</spring:bean>
</spring:property>
</spring:bean>
<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
<spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService"
init-method="start"
destroy-method="stop">
<spring:property name="brokerName" value="esb-amq-broker"/>
<spring:property name="persistent" value="true"/>
<spring:property name="dataDirectory" value="/home/bachman/activemq"/>
<spring:property name="useJmx" value="true"/>
<spring:property name="useShutdownHook" value="false"/>
<spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
<spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>
<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
disableTemporaryReplyToDestinations="true"/>
Run Code Online (Sandbox Code Playgroud)
FLOW - 将传入消息分派给3个队列Qn
<flow name="dispatch-to-queues">
<inbound-endpoint ref="incoming-cxf"/>
<!-- Each received message ends up to be sent to all destinations -->
<all>
<jms:outbound-endpoint name="queue.q1"
queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q1"
connector-ref="PersistentJMSConnector"/>
<jms:outbound-endpoint name="queue.q2"
queue="queue.q2" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on q2"
connector-ref="PersistentJMSConnector" />
<jms:outbound-endpoint name="queue.q3"
queue="queue.q3" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q3"
connector-ref="PersistentJMSConnector" />
</all>
<custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
</flow>
Run Code Online (Sandbox Code Playgroud)
FLOW - 处理从Qn到EXn的交付
<flow name="from-q1-to-ex1">
<jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Pull from q1."
connector-ref="PersistentJMSConnector">
<jms:transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<logger message="Sending message to EX-1" level="INFO" />
<!-- Handle errors at this point in flow
<custom-processor class="pl.exception.lookup.Component">
<spring:property name="targetModuleName" value="Not-important"/>
</custom-processor>
-->
<outbound-endpoint ref="ex1-cxf-endpoint">
<jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
</outbound-endpoint>
</flow>
Run Code Online (Sandbox Code Playgroud)
ENDPOINTS - 引用端点的声明
<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
</endpoint>
<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
<cxf:jaxws-client
clientClass="com.mycompany.services.Ex1"
wsdlLocation="classpath:wsdl/ex1.wsdl"
operation="someOperation"
port="SomePort"/>
</endpoint>
Run Code Online (Sandbox Code Playgroud)
在事务中使用JMS消息是解决方案按预期工作的必要条件:如果在CXF出站阶段发生异常,JMS消息将最终回滚,然后重新传递,从而触发新的CXF调用.
您必须仔细配置ActiveMQ客户端的重新传递策略,以便重试足够的时间,也可能不会太快(例如,指数退避).您还希望适当地处理DLQ.显示了ActiveMQ在Mule中使用Spring Beans的客户端配置:http://www.mulesoft.org/mule-activemq-integration-examples
还要确保在配置工厂中引用正确的代理URL.使用您的经纪人名称esb-amq-broker,您的配置工厂应该是:
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://esb-amq-broker"/>
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10507 次 |
| 最近记录: |