Sam*_*Axe 9 java amqp rabbitmq spring-amqp
RabbitMQ的新手和Java新手.
我正在尝试使用java Spring AMQP抽象编写一个使用手动acks并处理消费者取消通知的监听器.我可以使用Spring抽象来完成这两项任务吗?
我想写一个侦听器,它将从队列中提取消息并处理该消息(可能写入数据库或其他东西).我计划使用手动确认,这样如果消息处理失败或由于某种原因无法完成,我可以拒绝并重新排队.到目前为止,我认为我发现为了使用Spring AMQP手动ack/nack/reject,我必须使用ChannelAwareMessageListener.
我意识到我应该从RabbitMQ处理消费者取消通知,但是使用ChannelAwareMessageListener我并没有真正看到为此编码的方法.我看到处理CCN的唯一方法是使用较低级别的Java客户端api编写代码,方法是调用channel.basicConsume()并传递一个DefaultConsumer允许您处理消息传递和取消的新实例.
我还没有看到我会怎样设置clientProperties的ConnectionFactory(告诉我的经纪人能够处理的CCN)因为我在配置一个bean获得工厂.
我的侦听器的伪代码和容器的创建如下.
public class MyChannelAwareListener implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
msgProcessed = processMessage(message);
if(msgProcessed)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
public static void main(String[] args) throws Exception
{
ConnectionFactory rabbitConnectionFactory;
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH);
rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
MyChannelAwareListener listener = new MyChannelAwareListener();
container.setMessageListener(listener);
container.setQueueNames("myQueue");
container.setConnectionFactory(rabbitConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
}
Run Code Online (Sandbox Code Playgroud)
要设置客户端属性,您需要使用setClientProperties中的方法ConnectionFactory(假设此 ConnectionFactory 是 RabbitMQ Java 库中的对象)。此方法需要Map<String, Object>包含客户端属性和功能的数据。以下几行是 RabbitMQ Java 库内的默认值:
Map<String,Object> props = new HashMap<String, Object>();
props.put("product", LongStringHelper.asLongString("RabbitMQ"));
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION));
props.put("platform", LongStringHelper.asLongString("Java"));
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT));
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE));
Map<String, Object> capabilities = new HashMap<String, Object>();
capabilities.put("publisher_confirms", true);
capabilities.put("exchange_exchange_bindings", true);
capabilities.put("basic.nack", true);
capabilities.put("consumer_cancel_notify", true);
props.put("capabilities", capabilities);
Run Code Online (Sandbox Code Playgroud)
对于管理 ACK 和消费者取消,我不确定如何使用 Spring AMQP 抽象来做到这一点,但是它是完全可行的,它channel.basicConsume使您可以通过所有回调方法处理所有场景:
http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-javadoc-3.1.5/
希望这可以帮助!