如何在两个应用程序之间发布/订阅JMS消息?

ton*_*nga 2 java spring activemq-classic jms

我有两个Java独立应用程序.我想从一个应用程序发送消息并由两个客户端异步接收消息:一个与发送者在同一个应用程序中.另一种是在不同的应用程序中.两者都与ActiveMQ代理连接.但我只能看到第一个客户端收到消息而另一个客户端没有收到消息.通过JMS连接两个应用程序的一般方法是什么?我想我必须对JMS有一些不清楚的概念.我查了一下,但无法弄清楚如何设置我的Spring bean配置文件以在两个Java应用程序之间发布/订阅消息.

这是我的发送者在第一个应用程序中的bean配置文件,也是同一个应用程序中的第一个监听器bean:

<bean id="customerMessageSender" class="com.example.message.CustomerStatusSender">
    <property name="jmsTemplate" ref="jsmTemplateBean" />
    <property name="topic" ref="topicBean" />
</bean>

<bean id="jsmTemplateBean" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactoryBean"/>
    <property name="pubSubDomain" value="true"/>
</bean>

<bean id="topicBean" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="CustomerStatusTopic" />
</bean>

<bean id="connectionFactoryBean" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="customerStatusListener" class="com.example.message.CustomerStatusListener" />


<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactoryBean" />
    <property name="destination" ref="topicBean" />
    <property name="messageListener" ref="customerStatusListener" />
</bean>
Run Code Online (Sandbox Code Playgroud)

以下是第二个侦听器的bean配置文件,该文件位于不同的应用程序中:

<bean id="topicBean" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="CustomerStatusTopic" />
</bean>

<bean id="connectionFactoryBean" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<bean id="anotherCustomerStatusListener" class="com.mydomain.jms.CustomerStatusMessageListener" />

<bean id="listenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactoryBean" />
    <property name="destination" ref="topicBean" />
    <property name="messageListener" ref="anotherCustomerStatusListener" />
</bean> 
Run Code Online (Sandbox Code Playgroud)

如您所见,customerStatusListenerbean和anotherCustomerStatusListenerbean都订阅了topicBean.但只有第一个侦听器才能获取消息,因为它与发送方位于同一应用程序中,而第二个侦听器则不然.通过JMS连接两个Java应用程序的一般原则是什么,以便可以在两个单独的应用程序之间发送/接收消息?

编辑:我无法在第一个XML文件中添加以下侦听器bean,因为该类CustomerStatusMessageListener位于不同的应用程序中,因此在第一个(发送者)应用程序的类路径中不可见.

<bean id="anotherCustomerStatusListener" class="com.mydomain.jms.CustomerStatusMessageListener" />
Run Code Online (Sandbox Code Playgroud)

再次编辑:以下是第二个应用程序中与第一个应用程序分开的第二个侦听器.它包含一个main实例化侦听器bean 的方法(jms-beans.xml是上面列出的第二个侦听器的bean配置文件).

public class CustomerStatusMessageListener implements MessageListener {
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println("Subscriber 2 got you! The message is: "
                        + ((TextMessage) message).getText());
            } catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        } else {
            throw new IllegalArgumentException(
                    "Message must be of type TextMessage");
        }
    }

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("jms-beans.xml");
        CustomerStatusMessageListener messageListener = (CustomerStatusMessageListener) context.getBean("anotherCustomerStatusListener");
        context.close();
    }
}
Run Code Online (Sandbox Code Playgroud)

lre*_*der 6

您对JMS的理解是正确的.如果您希望两个侦听器接收相同的消息,则可以使用主题.如果一个侦听器与发送方在同一个VM中运行而另一个不在,则无关紧要.没有看到您的代码,您的Spring配置看起来也是正确的.还有一些事情要检查:

  • 两个侦听器都在同一主机上运行吗?也就是说,localhost一个听众是一个地方,另一个听众是另一个地方?
  • 您的第二个监听器是在发送消息时运行的吗?如果在发送消息时您的第二个侦听器未处于活动状态,则无法查看它是否稍后启动,除非您使用的是持久主题并且您的订阅者已至少连接到代理一次.

根据您的意见,第二项是您遇到问题的地方.

博客文章介绍了如何设置持久主题(如果需要通过代理重新启动来保留消息,则保留消息).基本上,将此配置添加到您的消息侦听器:

<property name="subscriptionDurable" value="true">
<property name="clientId" value="Some_unique_id">
<property name="durableSubscriptionName" value="Some_unique_id">
Run Code Online (Sandbox Code Playgroud)

每个订阅者的客户端ID和持久订阅名称必须不同,因此您的第一个侦听器将具有以下内容:

<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactoryBean" />
        <property name="destination" ref="topicBean" />
        <property name="messageListener" ref="anotherCustomerStatusListener" />
        <property name="subscriptionDurable" value="true">
        <property name="clientId" value="listener1">
        <property name="durableSubscriptionName" value="listener1">
</bean> 
Run Code Online (Sandbox Code Playgroud)

第二应该有:

      <bean id="listenerContainer"
           class="org.springframework.jms.listener.DefaultMessageListenerContainer">
           <property name="connectionFactory" ref="connectionFactoryBean" />
           <property name="destination" ref="topicBean" />
           <property name="messageListener" ref="anotherCustomerStatusListener" />
           <property name="subscriptionDurable" value="true">
           <property name="clientId" value="listener2">
           <property name="durableSubscriptionName" value="listener2">
       </bean> 
Run Code Online (Sandbox Code Playgroud)

请注意,您必须至少启动第二个侦听器以向代理注册,以便代理知道其clientId并为其存储消息,但您可以将其关闭并稍后启动它以获取它错过的任何消息下.

如果您的侦听器在高容量系统上长时间停留,则代理将为其存储所有消息,这可能最终填满磁盘或降低代理速度.请参阅ActiveMQ文档以自动删除持久消息.

  • 您的主要实例化弹簧上下文,然后立即退出.我怀疑是否有足够的时间让听众收到消息,甚至可能没有足够的时间让它注册ActiveMQ.在关闭spring上下文并退出main之前,尝试使用几十秒左右的"Thread.sleep()"让你的第二个监听器保持活动一段时间. (2认同)
  • 导致延迟的最可能原因是设置从侦听器到代理的初始连接的时间,以及代理在发送消息之前所做的任何设置和验证.设置初始连接后,您应该看到消息生成和消息消耗之间的延迟较少.在类似于你的情况下,ActiveMQ声称2000条消息/秒.请参见http://activemq.apache.org/performance.html. (2认同)