标签: spring-rabbit

如何在Spring AMQP中使用Ack或Nack

我是Spring AMQP的新手.我有一个应用程序,它是一个生产者向另一个消费者的应用程序发送消息.

消费者收到消息后,我们将对数据进行验证.

如果数据正确,我们必须确认并且应该从队列中删除消息.如果数据不正确,我们必须NACK(否定确认)数据,以便它将在RabbitMQ中重新排队.

我碰到

**factory.setDefaultRequeueRejected(false);**(它根本不会重新排列消息)

**factory.setDefaultRequeueRejected(true);**(它会在发生异常时重新排列消息)

但我的情况是,我将基于验证确认该消息.然后它应该删除该消息.如果NACK然后重新排列该消息.

我在RabbitMQ网站上看过

AMQP规范定义了basic.reject方法,该方法允许客户拒绝单个传递的消息,指示代理丢弃它们或重新排队它们

如何实现上述场景?请给我一些例子.

我尝试了一个小程序

       logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));
Run Code Online (Sandbox Code Playgroud)

消息不是为不同的异常factory.setDefaultRequeueRejected(true)重新排队

09:46:38,854 ERROR [stderr](SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException:没有使用键'WF89012'部署的进程

09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler](SimpleAsyncTaskExecutor-1)从错误队列收到:{ERROR =无法提交JPA事务; 嵌套异常是 javax.persistence.RollbackException:标记为rollbackOnly的事务 }

rabbitmq spring-rabbit spring-amqp spring-rabbitmq

9
推荐指数
1
解决办法
1万
查看次数

8
推荐指数
3
解决办法
5557
查看次数

带有弹簧靴的rabbitmq中的例外情况

当我用rabbitmq启动spring boot应用程序时,我反复得到以下异常.即使有以下例外情况,整个流程也能正常运行.是通过手段自动删除吗?

    08 Jul 2015 16:20:17,652 [ERROR] [SimpleAsyncTaskExecutor-2] SimpleMessageListenerContainer| Failed to check/redeclare auto-delete queue(s).
    java.util.concurrent.TimeoutException
        at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
        at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:37)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:367)
        at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:293)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:621)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:665)
        at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:208)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:444)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:80)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:130)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1004)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:254)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:963)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$300(SimpleMessageListenerContainer.java:83)
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1081)
        at java.lang.Thread.run(Thread.java:745)

08 Jul 2015 16:52:47,148 [WARN ] [SimpleAsyncTaskExecutor-1] SimpleMessageListenerContainer| Consumer raised exception, processing can restart if the connection factory supports it
java.util.concurrent.TimeoutException
    at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:77)
    at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:111) …
Run Code Online (Sandbox Code Playgroud)

java spring rabbitmq spring-rabbit spring-rabbitmq

8
推荐指数
1
解决办法
2万
查看次数

如何使用spring-rabbitmq将消息标记为持久性?

这就是我创建交换并将队列绑定到它的方式

<rabbit:topic-exchange id="dataExchange" name="MQ-EXCHANGE" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="COMM_QUEUE" pattern="queue.*" />
        </rabbit:bindings>
</rabbit:topic-exchange>
Run Code Online (Sandbox Code Playgroud)

我已经在互联网上阅读了很多帖子,其中写道如果要保护兔子或队列崩溃,还需要将消息标记为持久性.但我无法弄清楚如何标记我的消息持久性.

这就是我将消息发布到队列的方式

    @Autowired
    private RabbitTemplate template;

    @Override
    public void produceMessage(Object message, String routingKey) {
        template.convertAndSend(routingKey, message);  
    }
Run Code Online (Sandbox Code Playgroud)

我找了不同的API方法来了解这一点,并试图寻找我可以在XML中配置但无法找到方法的任何特定属性.任何指导?

rabbitmq spring-rabbit spring-amqp

8
推荐指数
1
解决办法
3124
查看次数

防止 spring-rabbit 中的 @RabbitListener 在集成测试期间尝试连接到服务器

我想为我使用 rabbitMq 的服务运行一些验收测试,但我想忽略所有需要服务间通信 (amqp) 的服务。

然而,问题是 Spring 尝试在启动时连接到(不存在的)rabbit 主机,以便它可以注册其使用者。它对每个注释的方法都@RabbitListener这样做,如果我的服务中有多个侦听器,这可能会因长时间超时而变得非常烦人。

我怎样才能减少这个超时甚至一起阻止@RabbitListener 连接?

我们的(简化的)Rabbit 配置:

@Configuration
@EnableRabbit
public class RabbitMqConfig {

    public RabbitMqConfig(
            @Value("${rabbitmq.host}") String rabbitHost,
            @Value("${rabbitmq.port}") int rabbitPort,
            @Value("${exchange.name}") String exchange) {
        this.rabbitHost = rabbitHost;
        this.rabbitPort = rabbitPort;
        this.exchange= exchange;
    }

  @Bean
  DirectExchange directExchangeBean() {
    return new DirectExchange(this.exchange, true, false);
  }

  @Bean
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost);
    connectionFactory.setPort(rabbitPort);
    return connectionFactory;
  }

  @Bean
  public RabbitTemplate rabbitTemplate() {
    return new RabbitTemplate(connectionFactory());
  }


  @Bean
  public Queue itemDoneQueue() { …
Run Code Online (Sandbox Code Playgroud)

java spring rabbitmq spring-rabbit

8
推荐指数
2
解决办法
9952
查看次数

在spring-amqp中读取兔子信息的最早进入点是什么?

我将线程本地兔子消息数据存储在MDC中.我想清除旧的并为传入的兔子消息添加新的上下文数据,例如从头部读取某些值或将兔子消息有效负载读取为byte[].不幸的是,我经常看到在消息发出带@RabbitHandler注释的方法之前发生异常.是否有一个更早的入口点可以用来建立这个上下文?我不知道在反序列化发生之前会发生什么,但理想情况下我想在尝试反序列化之前访问该消息.也许在onMessageReceived(byte[] message, Map headers)某处有一个方法钩子.调用堆栈越早越好.

java spring rabbitmq spring-rabbit spring-amqp

8
推荐指数
1
解决办法
600
查看次数

Spring AMQP RabbitMQ实现优先级队列

谷歌待了几天,我相信我完全迷失了.我想实现一种有大约3个队列的优先级队列:

  1. 高优先级队列(每日),需要先处理.
  2. 中间优先级队列(每周),如果队列#1中没有项目将处理.(这个队列中的消息是好的,它根本不会处理)
  3. 低优先级队列(每月),如果队列#1和#2中没有项目将处理.(这个队列中的消息是好的,它根本不会处理)

最初,我有以下流程,让消费者使用来自所有三个队列的消息,并检查队列#1,#2和#3中是否有任何项目.然后我意识到这是错误的,因为:

  1. 我完全迷失了一个问题:"我怎么知道它来自哪个队列?".
  2. 我已经在消耗任何队列中的消息了,所以如果我从较低优先级的队列中获取一个对象,如果我发现优先级较高的队列中有消息,我会把它放回队列吗?

以下是我目前的配置,它显示了我是个白痴.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<rabbit:connection-factory id="connectionFactory" host="localhost" />

<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
    exchange="" routing-key="daily_queue"/>

<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
    exchange="" routing-key="weekly_queue"/>

<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
    exchange="" routing-key="monthly_queue"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>    

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>    

<bean id="Consumer" class="com.test.Consumer" />

</beans>
Run Code Online (Sandbox Code Playgroud)

知道如何用优先级队列解决这个问题?

ps:我也想知道,如果Apache Camel有我可以依赖的东西吗?

更新1:我刚从Apache Camel看到这个:" https://issues.apache.org/jira/browse/CAMEL-2537 "JMSPriority上的音序器似乎是我正在寻找的,任何人都曾尝试过这个吗?

更新2:假设我在@Gary Russell推荐下使用RabbitMQ的插件,我有以下spring-rabbitmq上下文XML配置,这似乎有意义(由guest ..): …

java apache-camel rabbitmq spring-rabbit spring-amqp

7
推荐指数
1
解决办法
5210
查看次数

发送时在rabbitmq中设置邮件头

我想在向兔子发送消息时设置消息标题.我使用下面的代码,但混淆了如何设置邮件头.

public static <T> void sendMessage(String routingKey,final Object message,Class<T> type){
    DefaultClassMapper typeMapper = new DefaultClassMapper();
    typeMapper.setDefaultType(type);

    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setClassMapper(typeMapper);

    RabbitTemplate template = new RabbitTemplate(getConnectionFactory));
    template.setMessageConverter(converter);

    template.convertAndSend(routingKey, message);
}
Run Code Online (Sandbox Code Playgroud)

在上面的方法中,我只是争论java POJO对象及其要发送的类型.我想知道我应该在哪里设置邮件标题.

amqp rabbitmq jackson spring-rabbit spring-amqp

7
推荐指数
1
解决办法
7192
查看次数

如何在Spring Boot应用程序中使用spring-rabbit处理JSON消息?

这是我的代码段。

但是,运行应用程序时出现以下错误消息。

2017-02-28 17:16:35.931  WARN 11828 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at …
Run Code Online (Sandbox Code Playgroud)

java spring spring-rabbit spring-amqp spring-boot

7
推荐指数
2
解决办法
5869
查看次数

Spring注解@Retryable - 如何设置拦截器

我在类@Retryable中的方法上使用注释@Service

@Service
@EnableRetry 
public class PushService {

    @Retryable(maxAttempts=5)
    public Result pushIt(myMessage messageIn) {
        ...
    }
}
Run Code Online (Sandbox Code Playgroud)

它的工作方式就像一个魅力:我直接从 RabbitMQ 收到一条消息,直到没有错误或尝试次数达到 5 为止,它才会被确认,那时消息会直接进入 DLQ,就像我一样通缉。

我唯一的问题是我需要从属性文件动态设置 maxAttempts 。解决方案应该是设置一个拦截器,但是只有一个拦截器会导致错误,例如当我有:

@Service
@EnableRetry 
public class PushService {

    @Retryable(interceptor="myInterceptor") 
    public Result pushIt(myMessage messageIn) {
        ...
    }
}
Run Code Online (Sandbox Code Playgroud)

其中 myInterceptor 定义为:

@Bean
public StatefulRetryOperationsInterceptor myInterceptor() {
    return RetryInterceptorBuilder.stateful().maxAttempts(5).build();
}
Run Code Online (Sandbox Code Playgroud)

我得到一个无限循环,但有以下例外:

2015-04-08 07:12:10,970 GMT [SimpleAsyncTaskExecutor-1] (ConditionalRejectingErrorHandler.java:67) WARN  listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82) …
Run Code Online (Sandbox Code Playgroud)

java spring spring-rabbit spring-retry

6
推荐指数
1
解决办法
2万
查看次数