使用CoD over Camel JMS组件实现本机websphere MQ

ber*_*auz 8 jms apache-camel ibm-mq

我在使用Apache CAMEL实现Websphere MQ(WMQ)连接器时遇到了很多困难,该连接器可以无异常地处理MQ确认传递(CoD)报告,也不会以不需要的响应数据报的形式出现副作用.最后,我按照我想要的方式工作,如果您习惯于编写本机MQ客户端,那么这是一种非常标准和通用的方式.我在同一主题的帖子中记录了该方法,但我发现解决方案臃肿而复杂,并且非常感谢任何建议或示例,以使实现更清晰,更优雅.

我理解这个问题的根源在于MQ设计请求 - 回复消息交换模式(MEP)的方式,而不是JMS规范的方式,而不是其JMS组件中的请求 - 应答MEP的CAMEL实现.三种不同的哲学!

  1. WMQ具有MessageType标头(请参阅MQMD字段常量),其中请求值为1,回复为2,数据报为8(单向MEP).此外,值4用于以CoD(交付配置),PAN(肯定确认)和NAN(否定确认)的形式标记报告消息,其中 - 在消息流方面 - 也进行额外的回复信息.可以使用另一个名为"Report"的标题字段为Request消息,回复或数据报请求CoD,PAN和NAN ack,其中可以组合所有报告变体的标志.其他标头字段'ReplyToQ'和'ReplyToQMgr'指定原始发件人期望报告和回复的队列和队列管理器,以及固定的24字节'CorrelId'字段 - 可选 - 可以帮助将报告和回复与原始数据报或请求消息相关联.为了使其更复杂,实际上可以使用相同的原始消息ID和没有CorrelID发回回复和报告,或者在CorrelId中提供原始消息ID,或者在原始请求或数据报中已经指定时返回CorrelId值.IBM 通过WMQ提供JMS API,允许通过WMQ将纯JMS交换作为传输(在额外消息头名称MQRFH2的帮助下),或将本机MQ消息映射到JMS消息,反之亦然.
  2. 另一方面,JMS规范提供了一个可选的'JMSReplyTo'头字段和一个'JMSCorrelationID',但是确实将MEP语义保留给客户端应用程序; 即在规格中陈述:" 答复可能是可选的;由客户自行决定. "
  3. CAMEL以XML或Java DSL中的"路由"和内部Exchange对象模型为特征,目的是支持EIP模式,其中包括Request-Reply模式.然后CAMEL在其JMS组件中假定如果设置了JMSReplyTo字段,则这必然是期望回复的请求,导致Exchange的Out部分(或者如果Out为空,则部分修改)返回到JMSReplyTo中定义的队列. .

我愿意通过远程Websphere队列管理器支持本地MQ消息与交付确认(CoD)报告的交换,这样除了事务和持久性(即没有丢失,没有重复)之外,我还可以跟踪消息何时被消费并在发生延误时提出警报.

入境问题:

默认情况下,Websphere队列管理器在队列中的消息消耗完成时生成CoD报告.因此,在没有任何特定设置的情况下,当CAMEL端点消耗消息时,远程MQ客户端发送带有CoD标志的数据报(以及当时强制的ReplyToQ)将从队列管理器获得第一个回复作为MQ报告,然后是第二个(意外的)回复消息由CAMEL显式返回并包含CAMEL路由末尾的Exchange对象中剩余的内容,因为CAMEL假定存在JMSReplyTo字段的请求 - 应答EIP(从MQ ReplyToQ和ReplyToQMgr映射)被要求支持CoD回流).

出境问题:

如果没有特定设置,CAMEL默认也会在出站连接上采用请求 - 回复EIP/MEP.然后,CAMEL JMS/MQ端点将等待1个响应.当OUTbound消息是MQ上的JMS(因此具有MQRFH2头)时,这可以正常工作.当强制普通的vanilla MQ,即删除下面的MQRFH2头时,我无法使端点侦听器与相关的传入MQ报告匹配,尽管跟踪值看起来都是正确的(强制执行24个char相关ID以便截断更长的CorrelId值或空填充通过MQ无法对相关过滤器进行地理化处理).有没有人能够解决这个问题?

详细信息:虽然IBM JMS API接受传递特定的JMS属性值WMQ_MESSAGE_BODY = {1 | 0}/WMQ_TARGET_CLIENT = {1 | 0}来控制生成的消息中是否存在JMS头MQRFH2,但这些选项通过CAMEL变得不可用.必须使用CamelJmsDestinationName标头(如CAMEL JMS doc中所述)为具有选项"targetClient = 1"的目标提供IBM队列URL,以便摆脱MQRFH2标头.但是如果没有此标头,CoD报告或MQ回复上的CAMEL关联确实会失败.

上述问题的解决方案确实是建立一个特定的CAMEL路由来处理远程方返回的CoD报告(以及相关的MQ回复).因此,CAMEL出站消息必须强制为" InOnly " ExchangePattern,因此不要等待任何回复.但这会导致CAMEL过度抑制所有的ReplyTo字段.然后,如果在出站MQ数据报上请求了MQ CoD,则会发生CAMEL异常,其原因是MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q').

CAMEL记录了一个URI选项'disableReplyTo = true'来禁用回复模式的回复 - 同时保留ReplyTo字段 - 显然在入站和出站交换中.但是这个选项在出站JMS交换中不起作用(如所观察到的,我可能是错的),而且必须使用更不直观的"preserveMessageQos"选项.

欢迎这些问题的优雅解决方案.

ber*_*auz 5

下面记录了我能够获得的最好的结果,说明为Spring XML应用程序上下文,它本身承载CAMEL上下文和路由.它适用于IBM本机MQ JCA兼容资源适配器v7.5,CAMEL 2.15,Spring core 4.2.我可以将它部署到Glassfish和Weblogic服务器.

当然,在给定众多变量的情况下,在实际实现中使用java DSL.这个基于CAMEL XML DSL的示例是自包含且易于测试的.

我们从Spring和Camel声明开始:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">
Run Code Online (Sandbox Code Playgroud)

CAMEL上下文有两条路径:MQ到JMS和JMS到MQ,这里链接形成一个桥梁以便于测试.

<camel:camelContext id="mqBridgeCtxt">

  <camel:route id="mq2jms" autoStartup="true">
Run Code Online (Sandbox Code Playgroud)

很奇怪:当使用本机MQ资源适配器时,获取(例如)3个侦听器的唯一方法是强制执行3个连接(3个Camel:来自语句中的语句),每个连接最多1个会话,否则会发生MQ错误:MQJCA1018: Only one session per connection is allowed.但是,如果您使用MQ客户端jar,那么CAMEL JMS中的concurentConsumers选项确实可以正常工作.

    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 
Run Code Online (Sandbox Code Playgroud)

上面的disable disableReplyTo选项确保在我们可以测试MQ消息类型为1 = Request(-reply)或8 = datagram(单向!)之前CAMEL不会产生回复.这里没有说明测试和回复结构.

然后我们在下一个发布到普通JMS时强制执行EIP到InOnly以与入站MQ模式保持一致.

    <camel:setExchangePattern pattern="InOnly"/>
    <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
    <camel:to uri="ref:innerQueue" />
  </camel:route>
Run Code Online (Sandbox Code Playgroud)

接下来是jms-to-MQ路线:

  <camel:route id="jms2mq"  autoStartup="true">
    <camel:from uri="ref:innerQueue" />
    <!-- remove inner message headers and properties to test without inbound side effects! -->
    <camel:removeHeaders pattern="*"/> 
    <camel:removeProperties pattern="*" />
    <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->
Run Code Online (Sandbox Code Playgroud)

现在出现了远程目标返回的MQ CoD报告的请求标志.我们还将MQ消息强制为Datagram类型(值8).

    <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)

ReplyTo队列可以通过ReplyTo uri选项指定,否则作为标题指定如下.

接下来,我们使用CamelJmsDestinationName标头来强制抑制JMS MQ消息头MQRFH2(使用targetClient MQ URL选项值1).换句话说,我们希望发送普通的vanilla MQ二进制消息(即只有MQMD消息描述符后跟有效负载).

    <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
    <camel:setHeader headerName="CamelJmsDestinationName"><camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)

可以通过保留的JMS属性来控制更多MQMD字段,如下所示.请参阅IBM doc中的限制.

    <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
    <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)

URI中的目标队列被上面的CamelJmsDestinationName覆盖,因此URI中的队列名称将成为占位符.

URI选项preserveMessageQos是 - 如所观察到的 - 允许发送带有设置的ReplyTo数据的消息(以获取MQ CoD报告),但是通过强制执行InOnly MEP来阻止CAMEL实例化Reply消息监听器.

    <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
                exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
                includeSentJMSMessageID=true" />
  </camel:route>
</camel:camelContext>
Run Code Online (Sandbox Code Playgroud)

必须根据您的具体情况调整以下内容.它为本机JMS提供程序和Websphere MQ(通过本机IBM WMQ JCA资源适配器)提供队列工厂.我们在这里使用管理对象的JNDI查找.

<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>

<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="jmsraQCFBean" />
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="mqQCFBean" />
</bean>

</beans>
Run Code Online (Sandbox Code Playgroud)

或者,如果您使用MQ客户端jar而不是资源适配器,则将声明连接工厂bean(代替上面的JNDI查找):

<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="${mqHost}"/>
    <property name="port" value="${mqPort}"/>
    <property name="queueManager" value="${mqQueueManager}"/>
    <property name="channel" value="${mqChannel}"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="${connectionName}"/>
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>
Run Code Online (Sandbox Code Playgroud)

欢迎评论和改进.