带有IBM MQ的Apache Camel

Nit*_*tin 9 apache apache-camel ibm-mq

您有没有人使用过Camel和IBM的MQ.我们正在考虑可能同时使用这两种产品,但没有两种产品协同工作的例子.

Mat*_*ana 8

我在骆驼上广泛使用了IBM MQ.将两者结合使用没有问题.我将使用camel Jms Endpoint,Spring连接工厂和IBM MQ定义从我的一个spring上下文文件中粘贴一个示例配置.

骆驼路线

from("someplace")
    .to("cpaibmmq:queue:myQueueName");
Run Code Online (Sandbox Code Playgroud)

春天语境

<bean name="cpaibmmq" class="org.apache.camel.component.jms.JmsComponent" destroy-method="doStop">
    <property name="transacted" value="${jms.transacted}" />
    <property name="concurrentConsumers" value="${cpa.concurrentConsumers}" />
    <property name="maxConcurrentConsumers" value="${cpa.concurrentConsumers}" />
    <property name="acceptMessagesWhileStopping" value="${jms.acceptMessagesWhileStopping}" />
    <property name="acknowledgementModeName" value="${jms.acknowledgementModeName}" />
    <property name="cacheLevelName" value="${jms.cacheLevelName}" />
    <property name="connectionFactory" ref="ibmFac1" />
    <property name="exceptionListener" ref="ibmFac1" />
</bean>

<bean id="ibmFac1" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">
    <constructor-arg>
        <bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
            <property name="transportType" value="1" />
            <property name="channel" value="${cpa.wmq.channel}" />
            <property name="hostName" value="${cpa.wmq.hostname}" />
            <property name="port" value="${cpa.wmq.port}" />
            <property name="queueManager" value="${cpa.wmq.mqmanager}" />
        </bean>
    </constructor-arg>
</bean>
Run Code Online (Sandbox Code Playgroud)


ber*_*auz 4

我能得到的最好的记录如下,以 Spring XML 应用程序上下文为例,它本身托管 CAMEL 上下文和路由。该示例与 IBM 本机 MQ JCA 兼容的资源适配器 v7.5、CAMEL 2.16、Spring core 4.2 配合使用。我已将其部署在 Glassfish、Weblogic 和 JBoss EAP7 服务器中。

复杂性必然涉及处理 MQ 报告流,其原理与普通 JMS 回复消息的原理相冲突。详细说明请参阅通过 Camel JMS 组件使用 CoD 实现原生 websphere MQ

这个基于 CAMEL XML DSL 的示例是独立的并且易于测试。

我们从 Spring 和 CAMEL 声明开始:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util" 
xmlns:context="http://www.springframework.org/schema/context" 
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">
Run Code Online (Sandbox Code Playgroud)

CAMEL 上下文遵循 2 条路线:MQ 到 JMS 和 JMS 到 MQ,这里链接起来形成一个桥梁以简化测试。

<camel:camelContext id="mqBridgeCtxt">
<camel:route id="mq2jms" autoStartup="true">
Run Code Online (Sandbox Code Playgroud)

奇怪:在 Weblogic 上,获取(例如)3 个侦听器的唯一方法是强制执行 3 个连接(依次使用 3 个 Camel:from 语句),每个连接最多 1 个会话,否则会出现 MQ 错误:MQJCA1018: Only one session per connection is允许。在JBoss上,你可以简单地调整concurrentConsumers=...

  <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
        acknowledgementModeName=SESSION_TRANSACTED"/> 
Run Code Online (Sandbox Code Playgroud)

上面的禁用disableReplyTo选项可确保在我们测试MQ消息类型为1=Request(-reply)或8=datagram(一种方法!)之前,CAMEL不会生成回复。此处未说明该测试和回复结构。

然后,我们在下次发布到普通 JMS 时将 EIP 强制为 InOnly,以与入站 MQ 模式保持一致。

  <camel:setExchangePattern pattern="InOnly"/>
  <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
  <camel:to uri="ref:innerQueue" />
</camel:route>
Run Code Online (Sandbox Code Playgroud)

这样就结束了 MQ 到 jms 的路由;接下来是 jms-to-MQ 路由,仍然处于相同的 CAMEL 上下文中:

<camel:route id="jms2mq"  autoStartup="true">
  <camel:from uri="ref:innerQueue" />
  <!-- remove inner message headers and properties to test without inbound side effects! -->
  <camel:removeHeaders pattern="*"/> 
  <camel:removeProperties pattern="*" />
  <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->
Run Code Online (Sandbox Code Playgroud)

现在是远程目标返回 MQ CoD 报告的请求标志。我们还强制 MQ 消息为数据报类型(值 8)。

  <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
  <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
  <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)

ReplyTo 队列可以通过 ReplyTo uri 选项指定,也可以作为标头指定,如下所示。

接下来,我们使用 CamelJmsDestinationName 标头强制抑制 JMS MQ 消息标头 MQRFH2(使用 targetClient MQ URL 选项值 1)。换句话说,我们想要发送普通的 MQ 二进制消息(即仅包含 MQMD 消息描述符,后跟有效负载)。

  <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
  <camel:setHeader headerName="CamelJmsDestinationName"> <camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)

更多 MQMD 字段可以通过保留的 JMS 属性进行控制,如下所示。请参阅 IBM 文档中的限制。

  <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
  <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)

URI 中的目标队列被上面的 CamelJmsDestinationName 覆盖,因此 URI 中的队列名称成为占位符。

正如所观察到的,URI 选项preserveMessageQos 允许发送一条带有已设置的ReplyTo 数据的消息(以获取MQ CoD 报告),但通过强制执行InOnly MEP 来阻止CAMEL 实例化Reply 消息侦听器。

  <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
            exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
            includeSentJMSMessageID=true" />
</camel:route>
</camel:camelContext>
Run Code Online (Sandbox Code Playgroud)

我们还没有完成,我们仍然需要为本机 JMS 提供程序和 Websphere MQ(通过本机 IBM WMQ JCA 资源适配器)声明我们的队列工厂,以便根据您的上下文进行调整。我们在这里对管理对象使用 JNDI 查找。

<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>

<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="jmsraQCFBean" />
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="mqQCFBean" />
</bean>

</beans>
Run Code Online (Sandbox Code Playgroud)

从 JNDI 获取工厂(和 JCA 适配器)的另一种方法是将 JMS 客户端声明为 Spring bean。在 Weblogic 和 Glassfish 中,通过部署本机 IBM JCA 资源适配器并创建 JNDI 资源,然后在 Spring 上下文中引用(如上所述),您会得到更好的启发,在 JBoss 中,直接 MQ 客户端 bean 声明最适合,如下所示)

<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="${mqHost}"/>
    <property name="port" value="${mqPort}"/>
    <property name="queueManager" value="${mqQueueManager}"/>
    <property name="channel" value="${mqChannel}"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="${connectionName}"/>
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>
Run Code Online (Sandbox Code Playgroud)

欢迎提出意见和改进。