bma*_*anc 5 java connection activemq-classic
我有一个使用 ActiveMQConnectionFactory.createConnection() 获取 ActiveMQ 连接的类。然后它创建并拥有会话、目的地和监听该目的地的消费者(队列)。
我在消费者上调用 receive() 或 receive(millis) 以将我的消息从队列中取出。在某些情况下,我必须终止(中断)调用接收方法的线程。我尝试在此之后立即关闭会话和连接。不幸的是,我在调用 close 时经常遇到异常,并且相关的“ActiveMQ 传输”线程(以及与代理的相关连接)保持活动状态。我得到的例外是
org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection
at
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253)
at
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94)
at org.myorg.myservice.job.Job.run(Job.java:206)
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio)
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799)
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636)
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627)
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232)
... 2 more
Caused by: java.io.InterruptedIOException
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
... 7 more
Run Code Online (Sandbox Code Playgroud)
我试过以下连接 URLs failover://tcp://broker.ip.address:61616 tcp://broker.ip.address:61616 failover://tcp://broker.ip.address:61616? trace=true&closeAsync=false
此外,我尝试使用 MessageConsumer::receiveNoWait() 调用并仅执行我自己的 Thread.sleep,但是每当我调用以下命令关闭连接时,我总是以上述异常结束
try {
// I added the following two lines to see if the taskRunner was handling the threads - still no luck
TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner();
taskRunner.shutdown();
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
session.close();
if (connection instanceof ActiveMQConnection) {
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
amqConnection.stop();
}
connection.stop();
connection.close();
}
catch (ConnectionClosedException e) {
// NOOP - this is ok
}
catch (Exception e) {
throw new MessagingException("Failed to close JMS connection", e, log);
}
Run Code Online (Sandbox Code Playgroud)
小智 2
我不确定为什么关闭对你不起作用。但是您可以通过使用异步消息侦听器来完成您想要的任务。
Connection connection = connectionFactory.createConnection();
connection.start();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("queueName");
Consumer consumer = session.createConsumer( queue );
consumer.setMessageListener( new MessageListener()
{
public void onMessage( Message message )
{
// whatever was after the receive() method call
}
} );
Run Code Online (Sandbox Code Playgroud)
那么就没有必要被打扰了。完成后,关闭所有内容:
consumer.close();
session.close();
queue.close();
connection.close();
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5922 次 |
| 最近记录: |