Mar*_*sse 9 java multithreading thread-safety hornetq
我试图了解如何处理连接到HornetQ的Java客户端中的线程.我没有得到一个特定的错误,但没有理解我是如何预期首先处理线程(关于HornetQ客户端,特别是MessageHandler.onMessage()- 线程一般对我没有问题).
如果这是相关的:我正在使用'org.hornetq:hornetq-server:2.4.7.Final'嵌入我的应用程序的服务器.我不打算这有所作为.在我的情况下,从操作角度来看,这比运行独立服务器进程更方便.
到目前为止我做了什么:
创建嵌入式服务器: new EmbeddedHornetQ(),
.setConfiguration()
创建服务器定位器: HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))
serverLocator.createSessionFactory()现在看来很明显,我认为我可以通过创建一个会话hornetqClientSessionFactory.createSession(),创建该会话的生产者和消费者,并使用一个线程内的信息处理.send()和.receive().
但我也发现了consumer.setMessageHandler(),这告诉我,我根本不理解客户端中的线程.我尝试使用它,但随后消费者调用messageHandler.onMessage()两个与创建会话的线程不同的线程.这看起来与我看到代码的印象相符 - HornetQ客户端使用线程池来发送消息.
这让我感到困惑.javadocs说会话是a "single-thread object",并且代码同意 - 那里没有明显的同步.但是onMessage()在多个线程中调用时,message.acknowledge()也会在多个线程中调用,而那个线程只会委托给会话.
这应该怎么样?如何查看哪个MessageHandler不从多个线程访问会话?
更进一步,我如何从onMessage()中发送后续消息?我正在使用HornetQ作为持久的"待办事项"工作队列,因此发送后续消息对我来说是一个典型的用例.但同样,在内部onMessage(),我在访问会话的错误线程中.
请注意,我可以远离MessageHandler,只是以send() / receive()允许我控制线程的方式使用.但我确信我根本不了解整个情况,而且结合多线程只是在寻找麻烦.
小智 1
我可以回答你的部分问题,尽管我希望你现在已经解决了这个问题。
形成ClientConsumer 上的 HornetQ 文档(重点是我的):
ClientConsumer 从 HornetQ 队列接收消息。
可以使用 receive() 方法同步使用消息,该方法将阻塞直到收到消息(或超时到期),也可以通过设置 MessageHandler 异步使用消息。
这两种类型的消费是互斥的:如果调用了具有 MessageHandler 集的 ClientConsumer 的 receive() 方法,则会抛出 HornetQException。
因此,您在处理消息接收方面有两种选择:
.receive()或.receive(long itmeout)在您闲暇时ClientMessage检索
调用返回的(可选)对象Session您希望在 Consumer 中携带的内容,您可以根据需要转发消息.receive()onMessage(ClientMessage)
Session从该对象中检索 ,因为它没有由接口公开。QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);它来获取新的 Session 并从中发送消息...据我所知,这可能没问题,因为 Session 对象并未真正重新创建。我这样做也是因为 Sessions 有变得陈旧的趋势。我认为你不应该那么想要掌控一切执行线程,尤其是仅转发消息的瞬态线程。正如您所猜测的,HornetQ 具有内置线程池,并且可以有效地重用这些对象。
另外,正如您所知,您不需要在单个线程中访问对象(如队列),因此如果通过多个线程甚至通过多个会话访问队列并不重要。您只需确保会话只能由一个线程访问,这是 MessageListener 的设计初衷。
| 归档时间: |
|
| 查看次数: |
496 次 |
| 最近记录: |