vis*_*hal 1 java spring rabbitmq spring-batch spring-amqp
我正在尝试使用spring amqp使用rabbitmq,下面是我的配置.
<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />
<rabbit:admin connection-factory="rabbitConnectionFactory" />
<rabbit:queue name="${rabbitmq.import.queue}" />
<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />
<beans:bean id="importExchangeMessageListener"
class="com.stockopedia.batch.foundation.ImportMessageListener" />
<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>
Run Code Online (Sandbox Code Playgroud)
这是一个简单的Message Listener类,
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class ImportMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
}
}
Run Code Online (Sandbox Code Playgroud)
这是生产者(春季批次的itemWriter),
public class ImportItemWriter<T> implements ItemWriter<T> {
private AmqpTemplate template;
public AmqpTemplate getTemplate() {
return template;
}
public void setTemplate(AmqpTemplate template) {
this.template = template;
}
public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
}
}
}
Run Code Online (Sandbox Code Playgroud)
当我运行spring批处理作业时,将调用ImportItemWriter.write.但是ImportMessageListener.onMessage不起作用.它不打印消息.我在控制台上获得所有项目的输出
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
Run Code Online (Sandbox Code Playgroud)
您的消费者没有发送结果......
@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
}
Run Code Online (Sandbox Code Playgroud)
将其更改为简单的POJO; 容器MessageListenerAdapter将为您处理转换,并发送结果.
@Override
public String handleMessage(String message) {
System.out.println("consumer output: " + message);
return "result";
}
Run Code Online (Sandbox Code Playgroud)
编辑:
您还没有设置任何交换或路由到您的队列.如果要使用默认交换/路由,请使用...
convertSendAndReceive("", queueName, item.toString());
Run Code Online (Sandbox Code Playgroud)
EDIT2:
或者,将routingKey模板上的模板设置为队列名称,然后您可以使用更简单的方法.
这些...sendAndReceive()方法适用于请求/回复方案,因此需要阻止.要异步执行此操作,您必须使用其中一种...send()方法,并连接自己的方法SimpleListenerContainer以接收回复; 你必须做自己的关联.使用
public void convertAndSend(Object message, MessagePostProcessor postProcessor)
Run Code Online (Sandbox Code Playgroud)
并在您的消息发布处理器中,设置replyTo和correlationId标题...
message.getMessageProperties().setReplyTo("foo");
message.getMessageProperties().setCorrelationId("bar");
Run Code Online (Sandbox Code Playgroud)
或者,Message自己构建对象(例如,通过使用MessageBuilder)并使用send方法...
template.send(MessageBuilder.withBody("foo".getBytes())
.setReplyTo("bar")
.setCorrelationId("baz".getBytes())
.build());
Run Code Online (Sandbox Code Playgroud)
每个请求都需要一个唯一的,correlationId以便您可以关联响应.
| 归档时间: |
|
| 查看次数: |
8155 次 |
| 最近记录: |