我有一个兔子队列,上面有要消费的消息。我也有一个失败的听众。队列配置有死信交换(以及死信队列)。我想要的是在死信队列中的消息中看到异常信息。
当前的工作方式如下:
但是我想要的是在UI上的某个地方看到“ java.lang.RuntimeException:损坏的消息”。我认为它应该是一个自定义标头?
例如,是否可以将常规try-catch放入我的侦听器,并使用异常信息增强标头?
这个问题的答案解释了<rabbit:listener-container/>当侦听器不是线程安全时如何在Spring AMQP中使用原型范围.
另一位用户(在评论中)询问如何仅使用Java配置来配置相同的环境.
我尝试使用 TopicExchange 来屏蔽消息。
配置:
<rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest"/>
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<rabbit:queue name="sample.queue"/>
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" />
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<rabbit:annotation-driven container-factory="rabbitListenerContainerFactory"/>
<rabbit:listener-container connection-factory="connectionFactory" />
Run Code Online (Sandbox Code Playgroud)
成分:
@Component
public class JmsComponent {
private final Logger log = LoggerFactory.getLogger(JmsComponent.class);
private final TopicExchange exchange = new TopicExchange("sample.exchange");
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Queue queue;
private String received;
public void send(String msg) {
rabbitTemplate.convertAndSend("sample.queue", new SimpleMessage(msg));
}
public void bindToKey(String keyMask) {
BindingBuilder.bind(queue).to(exchange).with(keyMask);
rabbitTemplate.setExchange(exchange.getName());
}
public void sendByKey(String …Run Code Online (Sandbox Code Playgroud) 这是场景 - 有多个应用服务器。浏览器可以通过 websocket 连接到任何应用服务器。
应用服务器(消费者)都在监听一个特定的队列。一旦接收到 Web 套接字连接,特定的应用程序服务器就会将带有路由键 {userId} 的队列绑定到直接交换。
我希望发送到带有路由密钥 {userId} 的直接交换的消息只能由发生绑定的特定应用服务器接收。
在这种情况下,直接交换是正确的交换吗?还是应该使用其他类型的交换?
当 websocket 进入时,我正在使用 spring-amqp 创建动态绑定
// create the RabbitMq queue and bind to it
String routingKey = MessageConstants.getRoutingKeyForUserRecommendationQueue(user);
Binding userRecommendationBinding = BindingBuilder.bind(userRecommendationsQueue).
to(directExchange).with(routingKey);
amqpAdmin.declareBinding(userRecommendationBinding);
Run Code Online (Sandbox Code Playgroud) 我有一个互联网连接,其中出口网关定期更改.在发生这种情况之前的几秒钟,我正在向我的spring应用程序收到一个事件或回调通知.我想停止我的兔子消费者和连接,然后在几秒钟后(当我的网络连接恢复时)再次连接.
我正在使用spring amqp的基于注释的方法,但我也可以切换到另一种实现方式.
我知道spring-amqp正在为我重新连接,但我想自己做这件事来控制它.我如何以编程方式彻底清除所有消费者(RabbitListeners)以及与rabbitmq代理的干净连接以及所有消费者的干净连接和启动?如果消费者能够重新创建,那就好了.如果这是不可能的,我将不得不重新初始化一些对象属性.
我正在将 Spring Boot 与 spring-rabbitmq 一起使用。我的连接工厂已配置完毕application.properties,看起来不错。
我的目标是:在启动期间检查是否存在特定名称的队列,如果不存在则创建这样的队列。我不知道如何处理它。我应该在配置类中创建什么bean?据我所知,它应该是RabbitAdmin,但我不确定。你能帮助我吗?
我正在使用 RabbitMQ 设置 Springboot 2.1.4。该项目在一个简单的代理上运行良好,但是在使用 RabbitMQ 切换到 Stomp 代理中继时,应用程序启动失败并出现以下错误。
2019-04-19 18:32:36.841 INFO 80706 --- [main] ossconcurrent.ThreadPoolTaskExecutor: 关闭 ExecutorService 'clientOutboundChannelExecutor' 2019-04-19 18:32:36.807.ThreadosExecutor --- [main] ossconcurrent.ThreadPoolTaskExecutor关闭 ExecutorService 'clientInboundChannelExecutor' 2019-04-19 18:32:36.845 INFO 80706 --- [main] o.apache.catalina.core.StandardService : 停止服务 [Tomcat] 2019-04-19 18.862: INFO6 80706 --- [主要] ConditionEvaluationReportLoggingListener:启动ApplicationContext 时出错。要显示条件报告,请在启用“调试”的情况下重新运行您的应用程序。2019-04-19 18:32:36.871 错误 80706 --- [main] osboot.SpringApplication:应用程序运行失败 org.springframework.context。ApplicationContextException:无法启动 bean 'stompBrokerRelayMessageHandler';嵌套异常是 java.lang.NoClassDefFoundError: io/netty/util/concurrent/EventExecutor at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.6.RELEASE.jar :5.1.6.RELEASE] 在 org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE] 在 org. springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.RELEASE] 在 org.springframework.context.support.DefaultLifecycleProcessor.startBeans (DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.6.RELEASE.jar:5.1.6.
I'm currently working on a project in Kotlin that uses rabbit with reactor to receive messages of some DTO type, and dispatch them if they are up to certain criteria. In the process of testing my code, I tried to simulate bad Message input(Since the message is coming from an external service) and see the behaviour of the subscriber. Once I got a bad input message, the subscriber stopped listening to any new input and threw the following exception:
org.springframework.messaging.MessageDeliveryException: …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用多个队列测试和测试RabbitMQ的spring-amqp,因此我为每个队列创建了兔子模板并使用它来发送消息.发送的消息成功,我可以在交换机中看到一条消息,但我在队列中看不到任何内容.我猜这是非常小的设置,但无法弄明白.
这是我的applicationContext.xml
<bean id="banchmarkConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg ref="benchmarkAmqpHost"/>
<property name="username" ref="benchmarkAmqpUser"/>
<property name="password" ref="benchmarkAmqpPass"/>
<property name="virtualHost" ref="benchmarkAmqpVHost"/>
<property name="channelCacheSize" value="10"/>
</bean>
<rabbit:template id="benchmarkAmqpTemplate"
connection-factory="banchmarkConnectionFactory"
exchange="my_exchange"
queue="BenchmarkQueue"
routing-key="BenchmarkQueue" />
<rabbit:admin connection-factory="banchmarkConnectionFactory"/>
<rabbit:queue name="BenchmarkQueue" auto-delete="true" durable="false" auto-declare="true"/>
Run Code Online (Sandbox Code Playgroud)
这是我使用benchmarkAmqpTemplate发布到队列的代码.
public class publishMessage {
@Autowired
private RabbitTemplate benchmarkAmqpTemplate;
protected void publish(String payload) {
benchmarkAmqpTemplate.setQueue("BenchmarkQueue");
benchmarkAmqpTemplate.convertAndSend("my_exchange", "BenchmarkQueue", payload);
}
}
Run Code Online (Sandbox Code Playgroud)
当我使用HelloWorld 示例时,它确实在队列中发布了一条消息,所以想知道我是否做错了什么.
更新
我能够通过direct-exchange在我的上下文xml中添加标记来解决这个问题.我的完整xml看起来像这样:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsdhttp://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<bean id="banchmarkConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg ref="benchmarkAmqpHost"/>
<property name="username" ref="benchmarkAmqpUser"/>
<property name="password" …Run Code Online (Sandbox Code Playgroud) 我有以下监听方法:
@Override
public void onMessage(Message message, Channel channel) {
try {
// do something bad :)
} catch (Exception e){
try {
long dt = null != message.getMessageProperties()
? message.getMessageProperties().getDeliveryTag()
: 0;
channel.basicReject(dt, true);
} catch(IOException io) {
logger.error("IO-COMMON", io);
}
}
}
Run Code Online (Sandbox Code Playgroud)
问题是基本拒绝不起作用,我不知道为什么.如何优雅地拒绝它?我认为,如果我拒绝一条消息,它应该被重新排队并且驻留就像缓存一样,然后再转到下一个工作者.但实际上这条消息似乎已经丢失了.
spring-rabbit ×10
spring-amqp ×7
java ×5
rabbitmq ×5
spring-boot ×4
spring ×2
kotlin ×1
netty ×1
reactor ×1
stomp ×1