ber*_*auz 8 jms apache-camel ibm-mq
我在使用Apache CAMEL实现Websphere MQ(WMQ)连接器时遇到了很多困难,该连接器可以无异常地处理MQ确认传递(CoD)报告,也不会以不需要的响应数据报的形式出现副作用.最后,我按照我想要的方式工作,如果您习惯于编写本机MQ客户端,那么这是一种非常标准和通用的方式.我在同一主题的帖子中记录了该方法,但我发现解决方案臃肿而复杂,并且非常感谢任何建议或示例,以使实现更清晰,更优雅.
我理解这个问题的根源在于MQ设计请求 - 回复消息交换模式(MEP)的方式,而不是JMS规范的方式,而不是其JMS组件中的请求 - 应答MEP的CAMEL实现.三种不同的哲学!
我愿意通过远程Websphere队列管理器支持本地MQ消息与交付确认(CoD)报告的交换,这样除了事务和持久性(即没有丢失,没有重复)之外,我还可以跟踪消息何时被消费并在发生延误时提出警报.
默认情况下,Websphere队列管理器在队列中的消息消耗完成时生成CoD报告.因此,在没有任何特定设置的情况下,当CAMEL端点消耗消息时,远程MQ客户端发送带有CoD标志的数据报(以及当时强制的ReplyToQ)将从队列管理器获得第一个回复作为MQ报告,然后是第二个(意外的)回复消息由CAMEL显式返回并包含CAMEL路由末尾的Exchange对象中剩余的内容,因为CAMEL假定存在JMSReplyTo字段的请求 - 应答EIP(从MQ ReplyToQ和ReplyToQMgr映射)被要求支持CoD回流).
如果没有特定设置,CAMEL默认也会在出站连接上采用请求 - 回复EIP/MEP.然后,CAMEL JMS/MQ端点将等待1个响应.当OUTbound消息是MQ上的JMS(因此具有MQRFH2头)时,这可以正常工作.当强制普通的vanilla MQ,即删除下面的MQRFH2头时,我无法使端点侦听器与相关的传入MQ报告匹配,尽管跟踪值看起来都是正确的(强制执行24个char相关ID以便截断更长的CorrelId值或空填充通过MQ无法对相关过滤器进行地理化处理).有没有人能够解决这个问题?
详细信息:虽然IBM JMS API接受传递特定的JMS属性值WMQ_MESSAGE_BODY = {1 | 0}/WMQ_TARGET_CLIENT = {1 | 0}来控制生成的消息中是否存在JMS头MQRFH2,但这些选项通过CAMEL变得不可用.必须使用CamelJmsDestinationName标头(如CAMEL JMS doc中所述)为具有选项"targetClient = 1"的目标提供IBM队列URL,以便摆脱MQRFH2标头.但是如果没有此标头,CoD报告或MQ回复上的CAMEL关联确实会失败.
上述问题的解决方案确实是建立一个特定的CAMEL路由来处理远程方返回的CoD报告(以及相关的MQ回复).因此,CAMEL出站消息必须强制为" InOnly " ExchangePattern,因此不要等待任何回复.但这会导致CAMEL过度抑制所有的ReplyTo字段.然后,如果在出站MQ数据报上请求了MQ CoD,则会发生CAMEL异常,其原因是MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q').
CAMEL记录了一个URI选项'disableReplyTo = true'来禁用回复模式的回复 - 同时保留ReplyTo字段 - 显然在入站和出站交换中.但是这个选项在出站JMS交换中不起作用(如所观察到的,我可能是错的),而且必须使用更不直观的"preserveMessageQos"选项.
欢迎这些问题的优雅解决方案.
下面记录了我能够获得的最好的结果,说明为Spring XML应用程序上下文,它本身承载CAMEL上下文和路由.它适用于IBM本机MQ JCA兼容资源适配器v7.5,CAMEL 2.15,Spring core 4.2.我可以将它部署到Glassfish和Weblogic服务器.
当然,在给定众多变量的情况下,在实际实现中使用java DSL.这个基于CAMEL XML DSL的示例是自包含且易于测试的.
我们从Spring和Camel声明开始:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">
Run Code Online (Sandbox Code Playgroud)
CAMEL上下文有两条路径:MQ到JMS和JMS到MQ,这里链接形成一个桥梁以便于测试.
<camel:camelContext id="mqBridgeCtxt">
<camel:route id="mq2jms" autoStartup="true">
Run Code Online (Sandbox Code Playgroud)
很奇怪:当使用本机MQ资源适配器时,获取(例如)3个侦听器的唯一方法是强制执行3个连接(3个Camel:来自语句中的语句),每个连接最多1个会话,否则会发生MQ错误:MQJCA1018: Only one session per connection is allowed.但是,如果您使用MQ客户端jar,那么CAMEL JMS中的concurentConsumers选项确实可以正常工作.
<camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&disableReplyTo=true&
acknowledgementModeName=SESSION_TRANSACTED"/>
<camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&disableReplyTo=true&
acknowledgementModeName=SESSION_TRANSACTED"/>
<camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&disableReplyTo=true&
acknowledgementModeName=SESSION_TRANSACTED"/>
Run Code Online (Sandbox Code Playgroud)
上面的disable disableReplyTo选项确保在我们可以测试MQ消息类型为1 = Request(-reply)或8 = datagram(单向!)之前CAMEL不会产生回复.这里没有说明测试和回复结构.
然后我们在下一个发布到普通JMS时强制执行EIP到InOnly以与入站MQ模式保持一致.
<camel:setExchangePattern pattern="InOnly"/>
<!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
<camel:to uri="ref:innerQueue" />
</camel:route>
Run Code Online (Sandbox Code Playgroud)
接下来是jms-to-MQ路线:
<camel:route id="jms2mq" autoStartup="true">
<camel:from uri="ref:innerQueue" />
<!-- remove inner message headers and properties to test without inbound side effects! -->
<camel:removeHeaders pattern="*"/>
<camel:removeProperties pattern="*" />
<!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->
Run Code Online (Sandbox Code Playgroud)
现在出现了远程目标返回的MQ CoD报告的请求标志.我们还将MQ消息强制为Datagram类型(值8).
<camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
<camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
<camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)
ReplyTo队列可以通过ReplyTo uri选项指定,否则作为标题指定如下.
接下来,我们使用CamelJmsDestinationName标头来强制抑制JMS MQ消息头MQRFH2(使用targetClient MQ URL选项值1).换句话说,我们希望发送普通的vanilla MQ二进制消息(即只有MQMD消息描述符后跟有效负载).
<camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
<camel:setHeader headerName="CamelJmsDestinationName"><camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)
可以通过保留的JMS属性来控制更多MQMD字段,如下所示.请参阅IBM doc中的限制.
<camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR </camel:constant></camel:setHeader>
<camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>
Run Code Online (Sandbox Code Playgroud)
URI中的目标队列被上面的CamelJmsDestinationName覆盖,因此URI中的队列名称将成为占位符.
URI选项preserveMessageQos是 - 如所观察到的 - 允许发送带有设置的ReplyTo数据的消息(以获取MQ CoD报告),但是通过强制执行InOnly MEP来阻止CAMEL实例化Reply消息监听器.
<camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&
exchangePattern=InOnly&preserveMessageQos=true&
includeSentJMSMessageID=true" />
</camel:route>
</camel:camelContext>
Run Code Online (Sandbox Code Playgroud)
必须根据您的具体情况调整以下内容.它为本机JMS提供程序和Websphere MQ(通过本机IBM WMQ JCA资源适配器)提供队列工厂.我们在这里使用管理对象的JNDI查找.
<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>
<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>
<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="jmsraQCFBean" />
</bean>
<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="mqQCFBean" />
</bean>
</beans>
Run Code Online (Sandbox Code Playgroud)
或者,如果您使用MQ客户端jar而不是资源适配器,则将声明连接工厂bean(代替上面的JNDI查找):
<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
<property name="hostName" value="${mqHost}"/>
<property name="port" value="${mqPort}"/>
<property name="queueManager" value="${mqQueueManager}"/>
<property name="channel" value="${mqChannel}"/>
<property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
<property name="appName" value="${connectionName}"/>
</bean>
<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory" ref="mqCFBean"/>
<property name="transacted" value="true"/>
<property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>
Run Code Online (Sandbox Code Playgroud)
欢迎评论和改进.
| 归档时间: |
|
| 查看次数: |
3325 次 |
| 最近记录: |