JMS - 从一个消费者到多个消费者

Gon*_*gui 24 java messaging jms message-queue

我有一个JMS客户端,它生成消息并通过JMS队列发送给其唯一的消费者.

我想要的是不止一个消费者获得这些消息.我想到的第一件事就是将队列转换为主题,因此当前和新的消费者可以订阅并获得传递给所有消息的相同消息.

这显然涉及在生产者和消费者方面修改当前客户端代码.

我还想看看其他选项,比如创建第二个队列,这样我就不必修改现有的消费者了.我相信这种方法有一些优点(如果我错了,请纠正我)平衡两个不同队列之间的负载而不是一个,这可能会对性能产生积极影响.

我想就你可能会看到的这些选项和缺点/专业人士提出建议.任何反馈都非常感谢.

And*_*ell 49

你有一些选择,如你所说.

如果将其转换为主题以获得相同的效果,则需要使消费者成为持久消费者.如果您的消费者不活着,那么队列提供的一件事就是持久性.这取决于您使用的MQ系统.

如果您想坚持使用队列,您将为每个使用者创建一个队列,并为将侦听原始队列的调度程序创建一个队列.

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3
Run Code Online (Sandbox Code Playgroud)

主题优点

  • 更容易动态添加新消费者.所有消费者都会收到新消息而无需任何工作.
  • 您可以创建循环主题,以便Consumer_1将收到消息,然后是Consumer_2,然后是Consumer_3
  • 消费者可以推送新消息,而不必查询队列,使其成为被动的.

主题的缺点

  • 除非您的代理支持此配置,否则消息不会持久.如果消费者下线并返回,除非设置了持久消费者,否则可能会丢失消息.
  • 很难让Consumer_1和Consumer_2接收消息而不是Consumer_3.使用Dispatcher和Queues,Dispatcher无法将消息放入Consumer_3的队列中.

队列的优点

  • 在消费者删除消息之前,消息是持久的
  • 调度程序可以通过不将消息放入相应的消费者队列来过滤哪些消费者获得哪些消息.这可以通过过滤器来完成主题.

队列的缺点

  • 需要创建其他队列以支持多个消费者.在动态环境中,这不会有效.

在开发消息传递系统时,我更喜欢主题,因为它给了我最大的力量,但看到你已经在使用队列,它将要求你改变你的系统如何工作来实现主题.

具有多个消费者的队列系统的设计与实现

Producer -> Queue_Original <- Dispatcher -> Queue_Consumer_1 <- Consumer_1
                                         -> Queue_Consumer_2 <- Consumer_2
                                         -> Queue_Consumer_3 <- Consumer_3
Run Code Online (Sandbox Code Playgroud)

资源

请记住,还有其他事情你需要处理,如问题异常处理,重新连接到连接和队列,如果你失去了连接,等等.这只是为了让你了解如何完成我的事情描述.

在一个真实的系统中,我可能不会在第一个异常时退出.我会允许系统继续尽可能地运行并记录错误.因为它在此代码中如果将消息放入单个消费者队列中失败,整个调度程序将停止.

Dispatcher.java

/*
 * To change this template, choose Tools | Templates
 * and open the template in the editor.
 */
package stackoverflow_4615895;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

public class Dispatcher {

    private static long QUEUE_WAIT_TIME = 1000;
    private boolean mStop = false;
    private QueueConnectionFactory mFactory;
    private String mSourceQueueName;
    private String[] mConsumerQueueNames;

    /**
     * Create a dispatcher
     * @param factory
     *      The QueueConnectionFactory in which new connections, session, and consumers
     *      will be created. This is needed to ensure the connection is associated
     *      with the correct thread.
     * @param source
     *
     * @param consumerQueues
     */
    public Dispatcher(
        QueueConnectionFactory factory, 
        String sourceQueue, 
        String[] consumerQueues) {

        mFactory = factory;
        mSourceQueueName = sourceQueue;
        mConsumerQueueNames = consumerQueues;
    }

    public void start() {
        Thread thread = new Thread(new Runnable() {

            public void run() {
                Dispatcher.this.run();
            }
        });
        thread.setName("Queue Dispatcher");
        thread.start();
    }

    public void stop() {
        mStop = true;
    }

    private void run() {

        QueueConnection connection = null;
        MessageProducer producer = null;
        MessageConsumer consumer = null;
        QueueSession session = null;
        try {
            // Setup connection and queues for receiving the messages
            connection = mFactory.createQueueConnection();
            session = connection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            Queue sourceQueue = session.createQueue(mSourceQueueName);
            consumer = session.createConsumer(sourceQueue);

            // Create a null producer allowing us to send messages
            // to any queue.
            producer = session.createProducer(null);

            // Create the destination queues based on the consumer names we
            // were given.
            Queue[] destinationQueues = new Queue[mConsumerQueueNames.length];
            for (int index = 0; index < mConsumerQueueNames.length; ++index) {
                destinationQueues[index] = session.createQueue(mConsumerQueueNames[index]);
            }

            connection.start();

            while (!mStop) {

                // Only wait QUEUE_WAIT_TIME in order to give
                // the dispatcher a chance to see if it should
                // quit
                Message m = consumer.receive(QUEUE_WAIT_TIME);
                if (m == null) {
                    continue;
                }

                // Take the message we received and put
                // it in each of the consumers destination
                // queues for them to process
                for (Queue q : destinationQueues) {
                    producer.send(q, m);
                }
            }

        } catch (JMSException ex) {
            // Do wonderful things here 
        } finally {
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException ex) {
                }
            }
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException ex) {
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

Main.java

    QueueConnectionFactory factory = ...;

    Dispatcher dispatcher =
            new Dispatcher(
            factory,
            "Queue_Original",
            new String[]{
                "Consumer_Queue_1",
                "Consumer_Queue_2",
                "Consumer_Queue_3"});
    dispatcher.start();
Run Code Online (Sandbox Code Playgroud)


ska*_*man 5

您可能不必修改代码;这取决于你怎么写。

例如,如果您的代码使用MessageProducer而不是发送消息QueueSender,那么它将适用于主题和队列。同样,如果您使用MessageConsumer而不是QueueReceiver.

本质上,在 JMS 应用程序中使用非特定接口与 JMS 系统交互是一种很好的做法,例如MessageProducerMessageConsumerDestination等。如果是这样的话,那么它“仅仅是”配置问题。