Java HornetQ客户端中的线程处理

Mar*_*sse 9 java multithreading thread-safety hornetq

我试图了解如何处理连接到HornetQ的Java客户端中的线程.我没有得到一个特定的错误,但没有理解我是如何预期首先处理线程(关于HornetQ客户端,特别是MessageHandler.onMessage()- 线程一般对我没有问题).

如果这是相关的:我正在使用'org.hornetq:hornetq-server:2.4.7.Final'嵌入我的应用程序的服务器.我不打算这有所作为.在我的情况下,从操作角度来看,这比运行独立服务器进程更方便.

到目前为止我做了什么:

  1. 创建嵌入式服务器: new EmbeddedHornetQ(), .setConfiguration()

  2. 创建服务器定位器: HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))

  3. 创建一个会话工厂: serverLocator.createSessionFactory()

现在看来很明显,我认为我可以通过创建一个会话hornetqClientSessionFactory.createSession(),创建该会话的生产者和消费者,并使用一个线程内的信息处理.send().receive().

但我也发现了consumer.setMessageHandler(),这告诉我,我根本不理解客户端中的线程.我尝试使用它,但随后消费者调用messageHandler.onMessage()两个与创建会话的线程不同的线程.这看起来与我看到代码的印象相符 - HornetQ客户端使用线程池来发送消息.

这让我感到困惑.javadocs说会话是a "single-thread object",并且代码同意 - 那里没有明显的同步.但是onMessage()在多个线程中调用时,message.acknowledge()也会在多个线程中调用,而那个线程只会委托给会话. 这应该怎么样?如何查看哪个MessageHandler不从多个线程访问会话?

更进一步,我如何从onMessage()中发送后续消息?我正在使用HornetQ作为持久的"待办事项"工作队列,因此发送后续消息对我来说是一个典型的用例.但同样,在内部onMessage(),我在访问会话的错误线程中.

请注意,我可以远离MessageHandler,只是以send() / receive()允许我控制线程的方式使用.但我确信我根本不了解整个情况,而且结合多线程只是在寻找麻烦.

小智 1

我可以回答你的部分问题,尽管我希望你现在已经解决了这个问题。

形成ClientConsumer 上的 HornetQ 文档(重点是我的):

ClientConsumer 从 HornetQ 队列接收消息。
可以使用 receive() 方法同步使用消息,该方法将阻塞直到收到消息(或超时到期),也可以通过设置 MessageHandler 异步使用消息。
这两种类型的消费是互斥的:如果调用了具有 MessageHandler 集的 ClientConsumer 的 receive() 方法,则会抛出 HornetQException。

因此,您在处理消息接收方面有两种选择:

  1. 自己同步接收
    • 不向 HornetQ提供MessageListener
    • 在您自己的消费者线程中,调用.receive().receive(long itmeout)在您闲暇时
    • ClientMessage检索 调用返回的(可选)对象
      • 优点:使用Session您希望在 Consumer 中携带的内容,您可以根据需要转发消息
      • 缺点:所有这些消息处理都是连续的
  2. 将线程同步委托给HornetQ
    • 不要调用消费者 .receive()
    • 提供 MessageListener 实现onMessage(ClientMessage)
      • 优点:所有消息处理都是并发的、快速的、无麻烦的
      • 缺点:我认为不可能Session从该对象中检索 ,因为它没有由接口公开。
    • 未经测试的解决方法:在我的应用程序(与您的应用程序一样在虚拟机中)中,我将底层的线程安全QueueConnection公开为应用程序范围内可用的静态变量。从您的 MessageListener 中,您可以调用QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);它来获取新的 Session 并从中发送消息...据我所知,这可能没问题,因为 Session 对象并未真正重新创建。我这样做也是因为 Sessions 有变得陈旧的趋势。

我认为你不应该那么想要掌控一切执行线程,尤其是仅转发消息的瞬态线程。正如您所猜测的,HornetQ 具有内置线程池,并且可以有效地重用这些对象。

另外,正如您所知,您不需要在单个线程中访问对象(如队列),因此如果通过多个线程甚至通过多个会话访问队列并不重要。您只需确保会话只能由一个线程访问,这是 MessageListener 的设计初衷。