如何确保来自JMS队列的消息传递到外部WebService(CXF)?

Łuk*_*man 8 java activemq-classic jms cxf mule

这个问题

如何配置ActiveMQ<flow>进入Mule ESB 3.2,以确保从队列中提取的消息最终由外部正确处理CXF service

脚本

我有一个CXF端点,它应该接收传入消息并尽快将其传输到三个外部服务.我们称它们为EX1,EX2,EX3.由于<all>Mule 3.x中引入了组件,这非常简单.

整个解决方案最重要的要求是确保每个收到的消息最终都交付给所有三个CXF服务.所以我们最终得出了这个想法,将每个传入的消息放入Persistent JMS queues(Q1,Q2,Q3).在从队列Qn读取消息之后,它直接转移到相应的EXn端点,因此 - 外部服务.

配置

(我可以根据要求提供完整的配置)

我们已经按照此处的描述配置了ActiveMQ代理,并将其与我们的<flow>配置连接起来.一切似乎按预期工作,我有JConsole连接到我的应用程序,所以我可以看到消息是PERSISTENT类型,他们最终排队等候.如果一切顺利 - 所有三个服务EXn都会收到消息.

测试

当我们关闭其中一个服务时,问题就出现了,让我们说EX2,并重新启动整个服务器模拟失败.消息最终会丢失(我想这不是那么持久,是吧?).最奇怪的是 - 如果我们在EX2关闭时发送了10条消息,那么在服务器重启后,其中9条正在被重新传送!所以我想也许,也许,这10条消息中有9条已经正确入队,而当服务器发生故障时,这条消息中的9条经常被重新传送.

这让我觉得,CXF端点没有处理事务支持,说实话我无法理解.毕竟,当我尝试重新传递时,我可以看到消息在队列中,所以它应该被保留.这显然不是,但为什么呢?

我自己的尝试 我已经尝试过很多东西,但都没有.总是有一条消息丢失.

  1. 不要<jms:transaction />在流程中使用任何标签 - 不起作用
  2. 在收到消息时启动jms事务,在发送时加入 <cxf:jaxws-client />
  3. 将XA与JBoss一起使用<xa-transaction />- 并没有用
  4. 提供<default-exception-strategy>配置 - 如果我记得它让事情变得更糟

任何帮助表示赞赏,谢谢.

CONFIG

ACTIVE MQ CONFIGURATION

<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
    <spring:property name="queue" value="queue.*"/>
    <spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>

<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
    <spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&amp;broker.persistent=true&amp;broker.useJmx=true"/>
    <spring:property name="redeliveryPolicy">
        <spring:bean class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
            <spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
            <spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
            <spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
        </spring:bean>
    </spring:property>
</spring:bean>

<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
    <spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>

<spring:bean name="AmqBroker"
             class="org.apache.activemq.broker.BrokerService"
             init-method="start"
             destroy-method="stop">
    <spring:property name="brokerName" value="esb-amq-broker"/>
    <spring:property name="persistent" value="true"/>
    <spring:property name="dataDirectory" value="/home/bachman/activemq"/>
    <spring:property name="useJmx" value="true"/>
    <spring:property name="useShutdownHook" value="false"/>
    <spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
    <spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>

<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
                        numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
                        connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
                        disableTemporaryReplyToDestinations="true"/>
Run Code Online (Sandbox Code Playgroud)

FLOW - 将传入消息分派给3个队列Qn

<flow name="dispatch-to-queues">
        <inbound-endpoint ref="incoming-cxf"/>

        <!-- Each received message ends up to be sent to all destinations -->
        <all>
            <jms:outbound-endpoint name="queue.q1"
                queue="queue.q1" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q1"
                    connector-ref="PersistentJMSConnector"/>

            <jms:outbound-endpoint name="queue.q2"
                queue="queue.q2" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on q2"
                connector-ref="PersistentJMSConnector" />

            <jms:outbound-endpoint name="queue.q3"
                queue="queue.q3" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q3"
                connector-ref="PersistentJMSConnector" />

        </all>
        <custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
    </flow>
Run Code Online (Sandbox Code Playgroud)

FLOW - 处理从Qn到EXn的交付

<flow name="from-q1-to-ex1">
        <jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
            disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
            doc:name="JMS" doc:description="Pull from q1."
            connector-ref="PersistentJMSConnector">
                <jms:transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
        <logger message="Sending message to EX-1" level="INFO" />

        <!-- Handle errors at this point in flow
        <custom-processor class="pl.exception.lookup.Component">
            <spring:property name="targetModuleName" value="Not-important"/>
        </custom-processor>
        -->


        <outbound-endpoint ref="ex1-cxf-endpoint">
            <jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
        </outbound-endpoint>
    </flow>
Run Code Online (Sandbox Code Playgroud)

ENDPOINTS - 引用端点的声明

<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
        <cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
    </endpoint> 

<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
        <cxf:jaxws-client
                clientClass="com.mycompany.services.Ex1"
                wsdlLocation="classpath:wsdl/ex1.wsdl"
                operation="someOperation"
                port="SomePort"/>
    </endpoint>
Run Code Online (Sandbox Code Playgroud)

Dav*_*sot 5

在事务中使用JMS消息是解决方案按预期工作的必要条件:如果在CXF出站阶段发生异常,JMS消息将最终回滚,然后重新传递,从而触发新的CXF调用.

您必须仔细配置ActiveMQ客户端的重新传递策略,以便重试足够的时间,也可能不会太快(例如,指数退避).您还希望适当地处理DLQ.显示了ActiveMQ在Mule中使用Spring Beans的客户端配置:http://www.mulesoft.org/mule-activemq-integration-examples

还要确保在配置工厂中引用正确的代理URL.使用您的经纪人名称esb-amq-broker,您的配置工厂应该是:

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://esb-amq-broker"/>
    ...
Run Code Online (Sandbox Code Playgroud)

  • 好的,我认为Mule做正确的事。当我希望没有JMS消息丢失的有力保证时,我可以运行非嵌入式(即独立)和HA模式(集群)的JMS提供程序。现在,对于您的特殊情况,代理名称不匹配可能是问题所在:您可能正在从连接工厂开始第二次在内存代理中进行操作。 (2认同)