Spring RabbitMQ - 在具有@RabbitListener配置的服务上使用手动通道确认

Gur*_*uru 14 java rabbitmq spring-rabbit spring-amqp spring-rabbitmq

如何在不使用自动确认的情况下手动确认消息.是否有与一起使用的方式@RabbitListener@EnableRabbit风格的配置.大多数文档告诉我们SimpleMessageListenerContainer一起使用ChannelAwareMessageListener.但是使用它会失去注释提供的灵活性.我已经配置了我的服务如下:

@Service
public class EventReceiver {

@Autowired
private MessageSender messageSender;

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order) throws Exception {

  // code for processing order
}
Run Code Online (Sandbox Code Playgroud)

我的RabbitConfiguration如下

@EnableRabbit
public class RabbitApplication implements RabbitListenerConfigurer {

public static void main(String[] args) {
    SpringApplication.run(RabbitApplication.class, args);
}

@Bean


public MappingJackson2MessageConverter jackson2Converter() {
        MappingJackson2MessageConverter converter = new  MappingJackson2MessageConverter();
        return converter;
    @Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(rabbitConnectionFactory());
      factory.setMaxConcurrentConsumers(5);
      factory.setMessageConverter((MessageConverter) jackson2Converter());
      factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
      return factory;
    }

@Bean
public ConnectionFactory rabbitConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost("localhost");
    return connectionFactory;
}

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
    registrar.setContainerFactory(myRabbitListenerContainerFactory());
}

@Autowired
private EventReceiver receiver;
}
}
Run Code Online (Sandbox Code Playgroud)

如何调整手动通道确认以及上述配置风格将获得任何帮助.如果我们实现ChannelAwareMessageListener,那么onMessage签名将会改变.我们可以在服务上实现ChannelAwareMessageListener吗?

Gar*_*ell 24

添加Channel@RabbitListener方法...

@RabbitListener(queues = "${eventqueue}")
public void receiveMessage(Order order, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    ...
}
Run Code Online (Sandbox Code Playgroud)

并使用basicAck,basicReject.中的标签.

编辑

@SpringBootApplication
@EnableRabbit
public class So38728668Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38728668Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend("", "so38728668", "foo");
        context.getBean(Listener.class).latch.await(60, TimeUnit.SECONDS);
        context.close();
    }

    @Bean
    public Queue so38728668() {
        return new Queue("so38728668");
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    public static class Listener {

        private final CountDownLatch latch = new CountDownLatch(1);

        @RabbitListener(queues = "so38728668")
        public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
                throws IOException {
            System.out.println(payload);
            channel.basicAck(tag, false);
            latch.countDown();
        }

    }

}
Run Code Online (Sandbox Code Playgroud)

application.properties:

spring.rabbitmq.listener.acknowledge-mode=manual
Run Code Online (Sandbox Code Playgroud)

  • 该属性[已移至`spring.rabbitmq.listener.simple.acknowledge-mode`](https://github.com/spring-projects/spring-boot/blob/1.5.x/spring-boot-autoconfigure/src /main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java#L484)。在Spring Boot 2.0中,它可以是“ spring.rabbitmq.listener.simple.acknowledge-mode”或“ spring.rabbitmq.listener.direct.acknowledge-mode”,因为Spring AMQP现在支持2种容器类型。参见[文档](https://docs.spring.io/spring-boot/docs/2.0.0.RC1/reference/html/common-application-properties.html)。 (8认同)
  • 有人可以向我解释为什么Java程序员拒绝将导入内容放入其代码示例中吗?我觉得这样可以节省我几个小时。 (5认同)

小智 5

万一您需要使用ChannelAwareMessageListener类中的#onMessage()。然后,您可以通过这种方式进行操作。

@Component
public class MyMessageListener implements ChannelAwareMessageListener {

@Override
public void onMessage(Message message, Channel channel) {
    log.info("Message received.");
    // do something with the message
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
Run Code Online (Sandbox Code Playgroud)

}

而对于RabbitConfiguration

@Configuration
public class RabbitConfig {

public static final String topicExchangeName = "exchange1";

public static final String queueName = "queue1";

public static final String routingKey = "queue1.route.#";

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("xxxx");
    connectionFactory.setPassword("xxxxxxxxxx");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("vHost1");
    return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() {
    return new RabbitTemplate(connectionFactory());
}

@Bean
Queue queue() {
    return new Queue(queueName, true);
}

@Bean
TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}


@Bean
public SimpleMessageListenerContainer listenerContainer(POCRabbitMessageListener pocRabbitMessageListener) {
    SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
    listenerContainer.setConnectionFactory(connectionFactory());
    listenerContainer.setQueueNames(queueName);
    listenerContainer.setMessageListener(pocRabbitMessageListener);
    listenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    listenerContainer.setConcurrency("4");
    listenerContainer.setPrefetchCount(20);
    return listenerContainer;
}
Run Code Online (Sandbox Code Playgroud)

}