当队列持久时,慢速HornetQ Producer

Ana*_*oni 6 java queue jboss hornetq

我在horntQ中尝试过Persistent Queue.我制作了两个单独的例子(Producer,Consumer).我的消费者工作得很好,但制作人花了太多时间来完成发送消息.我既分别运行,也分别运行.可能是什么问题呢?我的代码是:

public  class HornetProducer implements Runnable{

    Context ic = null;
    ConnectionFactory cf = null;
    Connection connection =  null;
    Queue queue = null;
    Session session = null;
    MessageProducer publisher =  null;
    TextMessage message = null;
    int messageSent=0;

     public synchronized static Context getInitialContext()throws javax.naming.NamingException {

            Properties p = new Properties( );
            p.put(Context.INITIAL_CONTEXT_FACTORY,"org.jnp.interfaces.NamingContextFactory");
            p.put(Context.URL_PKG_PREFIXES," org.jboss.naming:org.jnp.interfaces");
            p.put(Context.PROVIDER_URL, "jnp://localhosts:1099");

            return new javax.naming.InitialContext(p);
        }  

    public HornetProducer()throws Exception{            

        ic = getInitialContext();
        cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
        queue = (Queue)ic.lookup("queue/testQueue2");
        connection = cf.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
        publisher = session.createProducer(queue);
        connection.start();

    }

    public void publish(){      
        try{        

            message = session.createTextMessage("Hello!");
            System.out.println("StartDate: "+new Date());

            for(int i=0;i<10000;i++){                   
                 messageSent++;              
                 publisher.send(message);                
            }
            System.out.println("EndDate: "+new Date());
        }catch(Exception e){
            System.out.println("Exception in Consume: "+ e.getMessage());
        }           
    }

    public void run(){
         publish();
    }

    public static void main(String[] args) throws Exception{

        new HornetProducer().publish();    
    }

}
Run Code Online (Sandbox Code Playgroud)

Cle*_*nic 3

您正在持续地、非事务性地发送这些消息。这意味着,发送的每条消息都必须单独完成。

这意味着对于您发送的每条消息,您都必须与服务器进行网络往返,并等待其完成持久性,然后才能发送另一条消息。

如果在这种情况下您有多个生产者,hornetq 会对两个生产者进行批处理,这样您会节省大量时间。(即服务器将批量处理许多写入请求)。

如果你想加快单个生产者的发送速度,你可能应该使用事务。

例如:

I - 将您的会话更改为已交易:

session = connection.createSession(true, Session.SESSION_TRANSACTIONED); 
Run Code Online (Sandbox Code Playgroud)

II - 提交每 N 条消息:

   for(int i=0;i<10000;i++){                   
         messageSent++;              
         publisher.send(message);  
         if (messageSent % 1000 == 0) session.commit();              
    }
    session.commit();
Run Code Online (Sandbox Code Playgroud)

您还可以禁用持久消息的同步。(异步发送)。