我试图了解在分布式数据库事务的上下文中处理RabbitMQ交付的正确模式是什么.
为了简单起见,我将用伪代码来说明我的想法,但实际上我正在使用Spring AMQP来实现这些想法.
什么都喜欢
void foo(message) {
processMessageInDatabaseTransaction(message);
sendMessageToRabbitMQ(message);
}
Run Code Online (Sandbox Code Playgroud)
凡时候,我们到达sendMessageToRabbitMQ()了processMessageInDatabaseTransaction()已经成功提交了对数据库的更改,或到达消息发送代码之前的异常被抛出.
我知道,因为sendMessageToRabbitMQ()我可以使用Rabbit事务或发布者确认,以保证Rabbit得到我的消息.
我感兴趣的是理解当事情向南发生时会发生什么,即当数据库事务成功时,但确认在一定时间后(发布者确认)或Rabbit事务未能提交(使用Rabbit事务)时未到达.
一旦发生这种情况,保证传递信息的正确模式是什么?
当然,在开发了幂等消费者之后,我认为我可以重新发送消息,直到Rabbit确认成功为止:
void foo(message) {
processMessageInDatabaseTransaction(message);
retryUntilSuccessFull {
sendMessagesToRabbitMQ(message);
}
}
Run Code Online (Sandbox Code Playgroud)
但是这种模式有一些我不喜欢的缺点,首先,如果故障延长,我的线程将开始阻塞,我的系统最终会变得无法响应.其次,如果我的系统崩溃或关闭会发生什么?我永远不会传递这些消息,因为它们会丢失.
所以,我想,好吧,我将首先将我的消息写入数据库,处于挂起状态,然后从那里发布我的待处理消息:
void foo(message) {
//transaction commits leaving message in pending status
processMessageInDatabaseTransaction(message);
}
@Poller(every="10 seconds")
void bar() {
for(message in readPendingMessagesFromDbStore()) {
sendPendingMessageToRabbitMQ(message);
if(confirmed) {
acknowledgeMessageInDatabase(message);
}
}
}
Run Code Online (Sandbox Code Playgroud)
如果我无法确认数据库中的消息,可能会多次发送消息.
但现在我已经介绍了其他问题:
然后我想好了,我可以让它变得更复杂,我可以从数据库发布直到我赶上事件的实时流然后发布实时,即保持大小为b的缓冲区(循环缓冲区)为我根据页面检查该消息是否在缓冲区中.如果是,则切换到实时订阅.
到目前为止,我意识到如何做到这一点并不是很明显,所以我得出结论,我需要了解解决这个问题的正确模式是什么.
那么,有没有人建议正确的方法是什么?
我正在使用Spring Boot并在docker容器中运行RabbitMQ。除兔子健康检查问题外,其他所有方法都运作良好。这意味着我无法将来自Web应用程序的消息排队。
这是我在应用中遇到的确切错误:
[n(14)-127.0.0.1] o.s.b.a.amqp.RabbitHealthIndicator : Rabbit health check failed
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:74) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
Run Code Online (Sandbox Code Playgroud)
我该如何解决?
我有一个互联网连接,其中出口网关定期更改.在发生这种情况之前的几秒钟,我正在向我的spring应用程序收到一个事件或回调通知.我想停止我的兔子消费者和连接,然后在几秒钟后(当我的网络连接恢复时)再次连接.
我正在使用spring amqp的基于注释的方法,但我也可以切换到另一种实现方式.
我知道spring-amqp正在为我重新连接,但我想自己做这件事来控制它.我如何以编程方式彻底清除所有消费者(RabbitListeners)以及与rabbitmq代理的干净连接以及所有消费者的干净连接和启动?如果消费者能够重新创建,那就好了.如果这是不可能的,我将不得不重新初始化一些对象属性.
以前我正在读取队列中存在的所有消息,但现在我必须根据用户选择(计数)返回特定数量的消息.
我尝试相应地更改for循环,但由于自动确认,它读取所有消息.所以我尝试在配置文件中将其更改为手动.
在我的程序中如何在阅读msg后手动确认消息(目前我正在使用AmqpTemplate接收并且我没有频道参考)?
Properties properties = admin.getQueueProperties("queue_name");
if(null != properties)
{
Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());
while(messageCount > 0)
{
Message msg = amqpTemplate.receive(queue_name);
String value = new String(msg.getBody());
valueList.add(value);
messageCount--;
}
}
Run Code Online (Sandbox Code Playgroud)
任何帮助都非常值得感谢,提前谢谢.
找不到与此特定错误相关的任何其他问题.
我似乎无法用RabbitMQ连接我的Spring Boot WebSocket演示项目.请注意,使用"简单"代理时一切正常,但是当使用Rabbit连接stomp代理时,我收到以下错误(继续尝试重新连接):
Java HotSpot(TM) Client VM warning: You have loaded library /tmp/libnetty-transport-native-epoll8916930274033685449.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
2016-03-07 22:35:13.993 INFO 4047 --- [ main] o.s.m.s.s.StompBrokerRelayMessageHandler : Started.
2016-03-07 22:35:14.045 INFO 4047 --- [eactor-tcp-io-1] r.io.net.impl.netty.tcp.NettyTcpClient : CONNECTED: [id: 0x034a269f, /127.0.0.1:39955 => /127.0.0.1:25672]
2016-03-07 22:35:14.151 INFO 4047 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer …Run Code Online (Sandbox Code Playgroud) rabbitmq spring-boot spring-websocket java-websocket spring-rabbitmq
如果我在其签名中添加MessageProperties,handleMessage方法不会从队列中获取消息.如果没有MessageProperties,它可以正常工作.
如何在MessageListenerAdapter的handleMessage中获取MessageProperties?
public class EventMessageAdapter {
public void handleMessage(MessageProperties messageProperties, Event event) {
...
String id = messageProperties.getHeaders().get("key");
}
Run Code Online (Sandbox Code Playgroud) 我使用spring-boot-starter-amqp 1.4.2.Producer和consumer工作正常,但有时传入的JSON消息的语法不正确.这导致以下(正确)异常:
org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...
Run Code Online (Sandbox Code Playgroud)
将来我可能面临更多例外.所以我想配置一个全局错误处理程序,这样如果任何一个消费者都有任何异常我可以全局处理它.
注意:在这种情况下,消息根本没有到达消费者.我希望在整个消费者中处理这些类型的异常.
请找到以下代码:
RabbitConfiguration.java
@Configuration
@EnableRabbit
public class RabbitMqConfiguration {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Bean
public MessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
@Primary
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
}
Run Code Online (Sandbox Code Playgroud)
消费者
@RabbitListener(
id = "book_queue",
bindings = @QueueBinding(
value = @Queue(value = "book.queue", durable …Run Code Online (Sandbox Code Playgroud) rabbitmq spring-rabbit spring-boot rabbitmq-exchange spring-rabbitmq
我正在使用带有Rabbit模板的SpringAMQP.如何将邮件直接发送到省略Exchange的队列?我该怎么做?
我目前将我的兔子监听器注释设置为:
@RabbitListener(queues ="my-queue")
是不是可以从我的yaml文件中提取队列名称.我想这样做的原因是,我可以通过更改yaml文件中的队列名称将队列更改为测试队列以进行集成测试.看来注释必须接受一个常量字符串,有没有办法围绕这个?谢谢,
我是rabbitMq和Spring AMQP的新手.我正在从头开始构建一个新项目.这里有一个组件我们使用rabbit-Mq作为消息代理.
在这个项目中,主要是所有的开发都发生在Java中.我们通常将Spring用于某些组件.
现在,虽然用Erlang编写的rabbit-mq确实提供了一个干净的java Api.还有spring amqp,它提供了一个很好的接口来支持松散耦合(通过AMQPTemplate等).
我认为使用SpringAMQP的一个优点是,由于上面提到的松散耦合,明天如果我们必须使用AMQP的任何其他实现而不是兔子Mq(spring-rabbit)我不必更改我的代码.但正如我所看到的,今天的实施是针对兔子MQ并且除非特殊情况,我在这里看不到任何事情.我们应该在近乎可预见的未来使用rabbit-Mq,也不确定除了rabbitMq之外是否还有其他消息代理实现在spring amqp方面.
我认为缺点是我完全不了解RabbitMq提供的实际客户端API,因为Spring AMQP给出了抽象(除非我决定深入挖掘).
在这种情况下,SPRING AMQP提供的传统rabbitMq Java API还有其他优势吗?
谢谢
spring-rabbitmq ×10
rabbitmq ×9
spring-amqp ×6
spring-boot ×5
spring ×2
amqp ×1
java ×1
send ×1