Apache Camel MQXAQueueConnectionFactory

J2B*_*J2B 5 apache-camel ibm-mq

好吧,我试图让MQXAQueueConnectionFactory工作,我已经从JmsComponent创建了一个扩展类,用于在将数据发送到队列时处理用户名和密码.它确实在队列中获取/放置消息,但在我的情况下,我已经创建了一个路由器来测试XA,例如

 from("wmq:queue:incomingQueue")
     .process(new Processor(){
     ... Thread.sleep(20000)
     })
     .to("wmq:queue:outgoingQueue")
Run Code Online (Sandbox Code Playgroud)

在睡觉的时候,我关闭了队列管理员.然而,当试图从队列中获取未通信的消息时, DISPLAY QSTATUS('qChainQueue')得到CURDEPTH(0),而它应该是1,因为我理解XA部分.

  • 我这样做完全错了吗?
  • 怎么测试?

HelpClass来处理WMQ:

public class WMQComponent extends JmsComponent {
    private final String username;
    private final String password;

    public WMQComponent(String hostname, int port, String username, String password,
                        String queueManager, String channel) throws JMSException {
        super();
        this.username = username;
        this.password = password;

        MQXAQueueConnectionFactory connectionFactory = new MQXAQueueConnectionFactory();
        connectionFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
        connectionFactory.setFailIfQuiesce(1);

        connectionFactory.setHostName(hostname);
        connectionFactory.setPort(port);
        connectionFactory.setQueueManager(queueManager);
        connectionFactory.setChannel(channel);

        setConnectionFactory(connectionFactory);
    }

    @Override
    public Endpoint createEndpoint(String uri) throws Exception {
        if (uri.contains("username") || uri.contains("password")) {
            throw new IllegalStateException("Username and password is set by the component");
        }
        if (uri.contains("?")) {
            return super.createEndpoint(uri + "&username=" + username + "&password=" + password);
        } else {
            return super.createEndpoint(uri + "?username=" + username + "&password=" + password);
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

出现以下错误:

2015-03-25 14:01:12,077 [ #2 - Multicast] INFO  dest_chain_ldap                - org.springframework.jms.IllegalStateException: JMSWMQ0018: Failed to connect to queue manager 'QMBATCHESB' with connection mode 'Client' and host name 'hostname.com'.; nested exception is com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0018: Failed to connect to queue manager 'QMBATCHESB' with connection mode 'Client' and host name 'hostname.com'. Check the queue manager is started and if running in client mode, check there is a listener running. Please see the linked exception for more information.; nested exception is com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2059' ('MQRC_Q_MGR_NOT_AVAILABLE').
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:279)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:168)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:469)
    at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:228)
    at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:431)
    at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:385)
    at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:163)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
    at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:163)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
    at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:732)
    at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:82)
    at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:303)
    at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:288)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
Caused by: com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0018: Failed to connect to queue manager 'QMBATCHESB' with connection mode 'Client' and host name 'hostname.com'. Check the queue manager is started and if running in client mode, check there is a listener running. Please see the linked exception for more information.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:496)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:236)
    at com.ibm.msg.client.wmq.internal.WMQConnection.<init>(WMQConnection.java:430)
    at com.ibm.msg.client.wmq.internal.WMQXAConnection.<init>(WMQXAConnection.java:70)
    at com.ibm.msg.client.wmq.factories.WMQXAConnectionFactory.createV7ProviderConnection(WMQXAConnectionFactory.java:190)
    at com.ibm.msg.client.wmq.factories.WMQConnectionFactory.createProviderConnection(WMQConnectionFactory.java:6210)
    at com.ibm.msg.client.jms.admin.JmsConnectionFactoryImpl.createConnection(JmsConnectionFactoryImpl.java:278)
    at com.ibm.mq.jms.MQConnectionFactory.createCommonConnection(MQConnectionFactory.java:6155)
    at com.ibm.mq.jms.MQQueueConnectionFactory.createQueueConnection(MQQueueConnectionFactory.java:144)
    at com.ibm.mq.jms.MQQueueConnectionFactory.createConnection(MQQueueConnectionFactory.java:223)
    at org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter.doCreateConnection(UserCredentialsConnectionFactoryAdapter.java:175)
    at org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter.createConnection(UserCredentialsConnectionFactoryAdapter.java:150)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:456)
    ... 29 more
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2059' ('MQRC_Q_MGR_NOT_AVAILABLE').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:223)
    ... 41 more
Caused by: com.ibm.mq.jmqi.JmqiException: CC=2;RC=2059;AMQ9204: Connection to host 'hostname.com(1514)' rejected. [1=com.ibm.mq.jmqi.JmqiException[CC=2;RC=2059;AMQ9213: A communications error for  occurred. [1=java.net.ConnectException[Connection refused: connect],3=hostname.com]],3=hostname.com(1514),5=RemoteTCPConnection.connnectUsingLocalAddress]
    at com.ibm.mq.jmqi.remote.internal.RemoteFAP.jmqiConnect(RemoteFAP.java:1831)
    at com.ibm.msg.client.wmq.internal.WMQConnection.<init>(WMQConnection.java:345)
    ... 40 more
Caused by: com.ibm.mq.jmqi.JmqiException: CC=2;RC=2059;AMQ9213: A communications error for  occurred. [1=java.net.ConnectException[Connection refused: connect],3=hostname.com]
    at com.ibm.mq.jmqi.remote.internal.RemoteTCPConnection.connnectUsingLocalAddress(RemoteTCPConnection.java:612)
    at com.ibm.mq.jmqi.remote.internal.RemoteTCPConnection.protocolConnect(RemoteTCPConnection.java:940)
    at com.ibm.mq.jmqi.remote.internal.system.RemoteConnection.connect(RemoteConnection.java:1097)
    at com.ibm.mq.jmqi.remote.internal.system.RemoteConnectionPool.getConnection(RemoteConnectionPool.java:348)
    at com.ibm.mq.jmqi.remote.internal.RemoteFAP.jmqiConnect(RemoteFAP.java:1503)
    ... 41 more
Caused by: java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.connect0(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:69)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:157)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
    at java.net.Socket.connect(Socket.java:579)
    at java.net.Socket.connect(Socket.java:528)
    at com.ibm.mq.jmqi.remote.internal.RemoteTCPConnection$2.run(RemoteTCPConnection.java:597)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.ibm.mq.jmqi.remote.internal.RemoteTCPConnection.connnectUsingLocalAddress(RemoteTCPConnection.java:588)
    ... 45 more
Run Code Online (Sandbox Code Playgroud)

J2B*_*J2B 3

所以我做得有点错误,使用 MQXAConnectionFactory 是不够的,但我必须创建 JmsComponent 作为事务处理。

尝试在运行应用程序时停止队列管理器,并在处理消息时停止应用程序,它似乎按预期进行回滚。

最终以

public static JmsComponent mqXAComponentTransacted(String hostname, int port, String username, String password,
                           String queueManager, String channel) throws JMSException {
    MQXAQueueConnectionFactory connectionFactory = new MQXAQueueConnectionFactory();
    connectionFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
    connectionFactory.setFailIfQuiesce(1);
    connectionFactory.setHostName(hostname);
    connectionFactory.setPort(port);
    connectionFactory.setQueueManager(queueManager);
    connectionFactory.setChannel(channel);

    UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter=new UserCredentialsConnectionFactoryAdapter();
    connectionFactoryAdapter.setTargetConnectionFactory(connectionFactory);
    connectionFactoryAdapter.setUsername(username);
    connectionFactoryAdapter.setPassword(password);

    return JmsComponent.jmsComponentTransacted(connectionFactoryAdapter);
}
Run Code Online (Sandbox Code Playgroud)

同样使用 UserCredentialsConnectionFactoryAdapter,我不想使用 Spring 组件,但由于 Jms 包已经依赖于它,因此使用它比我之前的解决方案来处理凭据更容易。