标签: spring-rabbit

死信时如何指定兔子信息的附加信息

我有一个兔子队列,上面有要消费的消息。我也有一个失败的听众。队列配置有死信交换(以及死信队列)。我想要的是在死信队列中的消息中看到异常信息。

当前的工作方式如下:

  1. 我将损坏的消息发送到我的普通队列。
  2. 我的侦听器(我正在使用Java的org.springframework.amqp.core.MessageListener)失败,并显示类似以下内容:“ java.lang.RuntimeException:损坏的消息”
  3. 邮件被拒绝,并通过死信交换进入死信队列。
  4. 当我查看Rabbit Admin UI中的死信时,我看到:标头:
    x-death:
    原因:被拒绝

但是我想要的是在UI上的某个地方看到“ java.lang.RuntimeException:损坏的消息”。我认为它应该是一个自定义标头?

例如,是否可以将常规try-catch放入我的侦听器,并使用异常信息增强标头?

java rabbitmq spring-rabbit spring-amqp rabbitmq-exchange

4
推荐指数
1
解决办法
1518
查看次数

如何使用非线程安全的MessageListener实现并发

这个问题的答案解释了<rabbit:listener-container/>当侦听器不是线程安全时如何在Spring AMQP中使用原型范围.

另一位用户(在评论中)询问如何仅使用Java配置来配置相同的环境.

rabbitmq spring-rabbit spring-amqp

4
推荐指数
1
解决办法
1181
查看次数

Spring rabbitmq 发送到交换与动态绑定

我尝试使用 TopicExchange 来屏蔽消息。

配置:

    <rabbit:connection-factory id="connectionFactory"  host="localhost" username="guest" password="guest"/>

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

<rabbit:queue name="sample.queue"/>

<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<rabbit:annotation-driven container-factory="rabbitListenerContainerFactory"/>

<rabbit:listener-container connection-factory="connectionFactory" />
Run Code Online (Sandbox Code Playgroud)

成分:

@Component
public class JmsComponent {

    private final Logger log = LoggerFactory.getLogger(JmsComponent.class);

    private final TopicExchange exchange = new TopicExchange("sample.exchange");

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    private String received;

    public void send(String msg) {
        rabbitTemplate.convertAndSend("sample.queue", new SimpleMessage(msg));
    }

    public void bindToKey(String keyMask) {
        BindingBuilder.bind(queue).to(exchange).with(keyMask);
        rabbitTemplate.setExchange(exchange.getName());
    }


    public void sendByKey(String …
Run Code Online (Sandbox Code Playgroud)

java spring rabbitmq spring-rabbit rabbitmq-exchange

4
推荐指数
1
解决办法
7825
查看次数

RabbitMQ - 向队列中的特定消费者发送消息

这是场景 - 有多个应用服务器。浏览器可以通过 websocket 连接到任何应用服务器。

应用服务器(消费者)都在监听一个特定的队列。一旦接收到 Web 套接字连接,特定的应用程序服务器就会将带有路由键 {userId} 的队列绑定到直接交换。

我希望发送到带有路由密钥 {userId} 的直接交换的消息只能由发生绑定的特定应用服务器接收。

在这种情况下,直接交换是正确的交换吗?还是应该使用其他类型的交换?

当 websocket 进入时,我正在使用 spring-amqp 创建动态绑定

// create the RabbitMq queue and bind to it
String routingKey = MessageConstants.getRoutingKeyForUserRecommendationQueue(user);
Binding userRecommendationBinding = BindingBuilder.bind(userRecommendationsQueue).
    to(directExchange).with(routingKey);
amqpAdmin.declareBinding(userRecommendationBinding);
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-rabbit spring-amqp

4
推荐指数
2
解决办法
5850
查看次数

如何启动,停止和重新连接spring rabbitmq监听器容器及其连接?

我有一个互联网连接,其中出口网关定期更改.在发生这种情况之前的几秒钟,我正在向我的spring应用程序收到一个事件或回调通知.我想停止我的兔子消费者和连接,然后在几秒钟后(当我的网络连接恢复时)再次连接.

我正在使用spring amqp的基于注释的方法,但我也可以切换到另一种实现方式.

我知道spring-amqp正在为我重新连接,但我想自己做这件事来控制它.我如何以编程方式彻底清除所有消费者(RabbitListeners)以及与rabbitmq代理的干净连接以及所有消费者的干净连接和启动?如果消费者能够重新创建,那就好了.如果这是不可能的,我将不得不重新初始化一些对象属性.

spring-rabbit spring-amqp spring-boot spring-rabbitmq

4
推荐指数
1
解决办法
2916
查看次数

连接到队列或在 spring-rabbitmq 中不存在的情况下创建

我正在将 Spring Boot 与 spring-rabbitmq 一起使用。我的连接工厂已配置完毕application.properties,看起来不错。

我的目标是:启动期间检查是否存在特定名称的队列,如果不存在则创建这样的队列。我不知道如何处理它。我应该在配置类中创建什么bean?据我所知,它应该是RabbitAdmin,但我不确定。你能帮助我吗?

java spring-rabbit spring-amqp spring-boot

4
推荐指数
1
解决办法
4964
查看次数

无法启动 bean 'stompBrokerRelayMessageHandler';嵌套异常是 java.lang.NoClassDefFoundError: io/netty/util/concurrent/EventExecutor

我正在使用 RabbitMQ 设置 Springboot 2.1.4。该项目在一个简单的代理上运行良好,但是在使用 RabbitMQ 切换到 Stomp 代理中继时,应用程序启动失败并出现以下错误。

2019-04-19 18:32:36.841 INFO 80706 --- [main] ossconcurrent.ThreadPoolTask​​Executor: 关闭 ExecutorService 'clientOutboundChannelExecutor' 2019-04-19 18:32:36.807.ThreadosExecutor --- [main] ossconcurrent.ThreadPoolTask​​Executor关闭 ExecutorService 'clientInboundChannelExecutor' 2019-04-19 18:32:36.845 INFO 80706 --- [main] o.apache.catalina.core.StandardService : 停止服务 [Tomcat] 2019-04-19 18.862: INFO6 80706 --- [主要] ConditionEvaluationReportLoggingListener:启动ApplicationContext 时出错。要显示条件报告,请在启用“调试”的情况下重新运行您的应用程序。2019-04-19 18:32:36.871 错误 80706 --- [main] osboot.SpringApplication:应用程序运行失败 org.springframework.context。ApplicationContextException:无法启动 bean 'stompBrokerRelayMessageHandler';嵌套异常是 java.lang.NoClassDefFoundError: io/netty/util/concurrent/EventExecutor at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.6.RELEASE.jar :5.1.6.RELEASE] 在 org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE] 在 org. springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE] 在 org.springframework.context.support.DefaultLifecycleProcessor.startBeans (DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.

java stomp netty spring-rabbit spring-boot

4
推荐指数
1
解决办法
2507
查看次数

Spring Cloud Stream Reactive - Handling Messages exception

I'm currently working on a project in Kotlin that uses rabbit with reactor to receive messages of some DTO type, and dispatch them if they are up to certain criteria. In the process of testing my code, I tried to simulate bad Message input(Since the message is coming from an external service) and see the behaviour of the subscriber. Once I got a bad input message, the subscriber stopped listening to any new input and threw the following exception:

 org.springframework.messaging.MessageDeliveryException: …
Run Code Online (Sandbox Code Playgroud)

reactor spring-rabbit kotlin spring-cloud-stream

4
推荐指数
1
解决办法
2570
查看次数

Spring amqp不向队列发布消息,而是向Exchange发布消息

我正在尝试使用多个队列测试和测试RabbitMQ的spring-amqp,因此我为每个队列创建了兔子模板并使用它来发送消息.发送的消息成功,我可以在交换机中看到一条消息,但我在队列中看不到任何内容.我猜这是非常小的设置,但无法弄明白.

这是我的applicationContext.xml

<bean id="banchmarkConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg ref="benchmarkAmqpHost"/>
        <property name="username" ref="benchmarkAmqpUser"/>
        <property name="password" ref="benchmarkAmqpPass"/>
        <property name="virtualHost" ref="benchmarkAmqpVHost"/>
        <property name="channelCacheSize" value="10"/>
</bean>
<rabbit:template id="benchmarkAmqpTemplate"
        connection-factory="banchmarkConnectionFactory"
        exchange="my_exchange"
        queue="BenchmarkQueue"
        routing-key="BenchmarkQueue" />
<rabbit:admin connection-factory="banchmarkConnectionFactory"/>
<rabbit:queue name="BenchmarkQueue" auto-delete="true" durable="false" auto-declare="true"/>
Run Code Online (Sandbox Code Playgroud)

这是我使用benchmarkAmqpTemplate发布到队列的代码.

public class publishMessage {
    @Autowired
    private RabbitTemplate benchmarkAmqpTemplate;

    protected void publish(String payload) {
        benchmarkAmqpTemplate.setQueue("BenchmarkQueue");
        benchmarkAmqpTemplate.convertAndSend("my_exchange", "BenchmarkQueue", payload);

    }
}
Run Code Online (Sandbox Code Playgroud)

当我使用HelloWorld 示例时,它确实在队列中发布了一条消息,所以想知道我是否做错了什么. 更新 我能够通过direct-exchange在我的上下文xml中添加标记来解决这个问题.我的完整xml看起来像这样:

    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<bean id="banchmarkConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
            <constructor-arg ref="benchmarkAmqpHost"/>
            <property name="username" ref="benchmarkAmqpUser"/>
            <property name="password" …
Run Code Online (Sandbox Code Playgroud)

spring rabbitmq spring-rabbit spring-amqp spring-boot

3
推荐指数
1
解决办法
5637
查看次数

java spring rabbit - 优雅地拒绝消息

我有以下监听方法:

@Override
public void onMessage(Message message, Channel channel) {
  try {
    // do something bad :)
  } catch (Exception e){
    try {
      long dt = null != message.getMessageProperties() 
          ? message.getMessageProperties().getDeliveryTag() 
          : 0;
      channel.basicReject(dt, true);
    } catch(IOException io) {
      logger.error("IO-COMMON", io);
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

问题是基本拒绝不起作用,我不知道为什么.如何优雅地拒绝它?我认为,如果我拒绝一条消息,它应该被重新排队并且驻留就像缓存一样,然后再转到下一个工作者.但实际上这条消息似乎已经丢失了.

java spring-rabbit spring-amqp

3
推荐指数
1
解决办法
2062
查看次数