新创建的客户端是否知道hornetq中的旧消息?

Sac*_*ach 4 java jms hornetq

我在一个消费者的hornetQ中创建了会话,然后我使用producer在队列中添加了4个消息.在此之后,我创造了新的消费者.

这位消费者会不会了解旧消息?

如果不是,是否可以在XML中配置它?

我创建了一个无法获取以前消息的新消费者.我只想确认这种行为是否正确?我没有在文档中找到任何帮助.

以下是代码段:

TextMessage receivedMessage = (TextMessage)consumer.receive(); 
receivedMessage.acknowledge();
System.out.println("Got order: " + receivedMessage.getText());
//consumer.close();
MessageConsumer newConsumer = session.createConsumer(orderQueue);
receivedMessage = (TextMessage)newConsumer.receive();
receivedMessage.acknowledge();  
System.out.println("Got order: " + receivedMessage.getText());
Run Code Online (Sandbox Code Playgroud)


如果我取消注释consumer.close()行,它可以正常工作.
我的hornetq-jms.xml

<connection-factory name="NettyConnectionFactory">
      <xa>true</xa>
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="/XAConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyConnectionFactory">
      <xa>false</xa>
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="/ConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyThroughputConnectionFactory">
      <xa>true</xa>
      <connectors>
         <connector-ref connector-name="netty-throughput"/>
      </connectors>
      <entries>
         <entry name="/XAThroughputConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>

   <connection-factory name="NettyThroughputConnectionFactory">
      <xa>false</xa>
      <connectors>
         <connector-ref connector-name="netty-throughput"/>
      </connectors>
      <entries>
         <entry name="/ThroughputConnectionFactory"/>
      </entries>
       <consumer-window-size>0</consumer-window-size>
   </connection-factory>
Run Code Online (Sandbox Code Playgroud)

连接工厂的代码片段

TransportConfiguration transportConfiguration = new
                TransportConfiguration(NettyConnectorFactory.class.getName());
        HornetQConnectionFactory cf =
                HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
        Queue orderQueue = HornetQJMSClient.createQueue("MutationPipelineQueue");
Run Code Online (Sandbox Code Playgroud)

getTransportConfiguration()的代码:

private synchronized static TransportConfiguration getTransportConfiguration() {
        HashMap<String, TransportConfiguration> transportConfigurationMap = new HashMap<String, TransportConfiguration>();
        TransportConfiguration tc = transportConfigurationMap.get("machinename:5455");
        if(tc == null){
            Map<String, Object> connectionParams = new HashMap<String, Object>();
            connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME,"machinename");
            connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME,Integer.valueOf("5455"));
            tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
            transportConfigurationMap.put("machinename:5455", tc);
        }
        return tc;
Run Code Online (Sandbox Code Playgroud)

Cle*_*nic 7

是的,它会知道您的旧消息.但是在这种情况下,您的旧消费者仍然处于打开状态,因此消费者将在其缓冲区上缓存消息,除非您关闭它,或者您更改了consumer-window-size = 0.

大多数消息系统将在消费者面前预先缓存,因此下次您在消费者上呼叫接收时,消息将准备好接收.

但是,如果您的消费者很慢并且您没有那么多消息,则消息将在客户端缓冲区上,直到您关闭该消费者.

对于生产中的快速消费者而言,最好的是提前缓存,因为这将提高您的吞吐量,这将受到网络延迟的限制而无需缓存.

在HornetQ案例中,您可以通过设置consumer-window-size = 0来应对缓慢的消费者.

http://docs.jboss.org/hornetq/2.3.0.beta3/docs/user-manual/html/flow-control.html#flow-control.consumer.window

在您通过JNDI查找实例化连接工厂的情况下:

   <connection-factory name="ConnectionFactory">
      <connectors>
         <connector-ref connector-name="netty-connector"/>
      </connectors>
      <entries>
         <entry name="ConnectionFactory"/>
      </entries>

      <!-- We set the consumer window size to 0, which means messages are not buffered at all
      on the client side -->
      <consumer-window-size>0</consumer-window-size>

   </connection-factory>
Run Code Online (Sandbox Code Playgroud)

或者,如果您直接实例化连接工厂,则必须在实例中设置consumerWindowSize:

TransportConfiguration transportConfiguration = new
                TransportConfiguration(NettyConnectorFactory.class.getName());
HornetQConnectionFactory cf =
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,getTransportConfiguration());
cf.setConsumerWindowSize(0) // <<<<<< here
Run Code Online (Sandbox Code Playgroud)

这是来自HornetQ发行版示例/ jms/no-consumer-buffering的一个运行示例.它与您的代码段完全相同,并且每次都有效:

 // Step 5. Create a JMS Session
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 // Step 6. Create a JMS Message Producer
 MessageProducer producer = session.createProducer(queue);

 // Step 7. Create a JMS MessageConsumer

 MessageConsumer consumer1 = session.createConsumer(queue);

 // Step 8. Start the connection

 connection.start();

 // Step 9. Send 10 messages to the queue

 final int numMessages = 10;

 for (int i = 0; i < numMessages; i++)
 {
    TextMessage message = session.createTextMessage("This is text message: " + i);

    producer.send(message);
 }


 System.out.println("Sent messages");

 // Step 10. Create another consumer on the same queue

 MessageConsumer consumer2 = session.createConsumer(queue);

 // Step 11. Consume three messages from consumer2

 for (int i = 0; i < 3; i++)
 {
    TextMessage message = (TextMessage)consumer2.receive(2000);

    System.out.println("Consumed message from consumer2: " + message.getText());
 }
Run Code Online (Sandbox Code Playgroud)

正如您在此示例中所看到的,正在接收旧消息.

与此不同的是您的系统配置错误.也许你没有设置正确的连接工厂?

BTW:在ActiveMQ上,您可以管理预取限制以管理相同的行为:

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

这个问题与多个消费者JMS队列完全重复

至于追溯消息是ActiveMQ上的另一个仅适用于主题的功能,正在使用旧消息创建订阅.