标签: spring-rabbitmq

分布式数据库事务中的RabbitMQ和交付保证

我试图了解在分布式数据库事务的上下文中处理RabbitMQ交付的正确模式是什么.

为了简单起见,我将用伪代码来说明我的想法,但实际上我正在使用Spring AMQP来实现这些想法.

什么都喜欢

void foo(message) {
   processMessageInDatabaseTransaction(message);
   sendMessageToRabbitMQ(message);
}
Run Code Online (Sandbox Code Playgroud)

凡时候,我们到达sendMessageToRabbitMQ()processMessageInDatabaseTransaction()已经成功提交了对数据库的更改,或到达消息发送代码之前的异常被抛出.

我知道,因为sendMessageToRabbitMQ()我可以使用Rabbit事务发布者确认,以保证Rabbit得到我的消息.

我感兴趣的是理解当事情向南发生时会发生什么,即当数据库事务成功时,但确认在一定时间后(发布者确认)或Rabbit事务未能提交(使用Rabbit事务)时未到达.

一旦发生这种情况,保证传递信息的正确模式是什么?

当然,在开发了幂等消费者之后,我认为我可以重新发送消息,直到Rabbit确认成功为止:

void foo(message) {
   processMessageInDatabaseTransaction(message);
   retryUntilSuccessFull {
      sendMessagesToRabbitMQ(message);
   }
}
Run Code Online (Sandbox Code Playgroud)

但是这种模式有一些我不喜欢的缺点,首先,如果故障延长,我的线程将开始阻塞,我的系统最终会变得无法响应.其次,如果我的系统崩溃或关闭会发生什么?我永远不会传递这些消息,因为它们会丢失.

所以,我想,好吧,我将首先将我的消息写入数据库,处于挂起状态,然后从那里发布我的待处理消息:

void foo(message) {
   //transaction commits leaving message in pending status
   processMessageInDatabaseTransaction(message);
}

@Poller(every="10 seconds")
void bar() {
   for(message in readPendingMessagesFromDbStore()) {
      sendPendingMessageToRabbitMQ(message);
      if(confirmed) {
          acknowledgeMessageInDatabase(message); 
      }
   }
}
Run Code Online (Sandbox Code Playgroud)

如果我无法确认数据库中的消息,可能会多次发送消息.

但现在我已经介绍了其他问题:

  • 需要从数据库执行I/O以发布消息,即99%的时间可以立即成功发布,而无需检查数据库.
  • 由于现在我增加了消息发布的延迟,因此难以使轮询器更接近实时传送.
  • 也许还有其他一些复杂因素,例如保证事件按顺序传递,轮询执行器相互衔接,多个轮询器等等.

然后我想好了,我可以让它变得更复杂,我可以从数据库发布直到我赶上事件的实时流然后发布实时,即保持大小为b的缓冲区(循环缓冲区)为我根据页面检查该消息是否在缓冲区中.如果是,则切换到实时订阅.

到目前为止,我意识到如何做到这一点并不是很明显,所以我得出结论,我需要了解解决这个问题的正确模式是什么.

那么,有没有人建议正确的方法是什么?

amqp rabbitmq spring-amqp spring-rabbitmq

5
推荐指数
1
解决办法
1108
查看次数

RabbitMQ健康检查失败

我正在使用Spring Boot并在docker容器中运行RabbitMQ。除兔子健康检查问题外,其他所有方法都运作良好。这意味着我无法将来自Web应用程序的消息排队。

这是我在应用中遇到的确切错误:

[n(14)-127.0.0.1] o.s.b.a.amqp.RabbitHealthIndicator       : Rabbit health check failed
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:74) ~[spring-rabbit-2.0.3.RELEASE.jar:2.0.3.RELEASE]
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

我该如何解决?

java spring rabbitmq spring-boot spring-rabbitmq

5
推荐指数
0
解决办法
938
查看次数

如何启动,停止和重新连接spring rabbitmq监听器容器及其连接?

我有一个互联网连接,其中出口网关定期更改.在发生这种情况之前的几秒钟,我正在向我的spring应用程序收到一个事件或回调通知.我想停止我的兔子消费者和连接,然后在几秒钟后(当我的网络连接恢复时)再次连接.

我正在使用spring amqp的基于注释的方法,但我也可以切换到另一种实现方式.

我知道spring-amqp正在为我重新连接,但我想自己做这件事来控制它.我如何以编程方式彻底清除所有消费者(RabbitListeners)以及与rabbitmq代理的干净连接以及所有消费者的干净连接和启动?如果消费者能够重新创建,那就好了.如果这是不可能的,我将不得不重新初始化一些对象属性.

spring-rabbit spring-amqp spring-boot spring-rabbitmq

4
推荐指数
1
解决办法
2916
查看次数

在RabbitMQ中手动确认消息

以前我正在读取队列中存在的所有消息,但现在我必须根据用户选择(计数)返回特定数量的消息.

我尝试相应地更改for循环,但由于自动确认,它读取所有消息.所以我尝试在配置文件中将其更改为手动.

在我的程序中如何在阅读msg后手动确认消息(目前我正在使用AmqpTemplate接收并且我没有频道参考)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());

            valueList.add(value);
            messageCount--;
        }
}
Run Code Online (Sandbox Code Playgroud)

任何帮助都非常值得感谢,提前谢谢.

rabbitmq spring-amqp spring-rabbitmq

3
推荐指数
1
解决办法
5807
查看次数

Spring Boot WebSocket Rabbitmq Stomp Broker不保持连接

找不到与此特定错误相关的任何其他问题.

我似乎无法用RabbitMQ连接我的Spring Boot WebSocket演示项目.请注意,使用"简单"代理时一切正常,但是当使用Rabbit连接stomp代理时,我收到以下错误(继续尝试重新连接):

Java HotSpot(TM) Client VM warning: You have loaded library /tmp/libnetty-transport-native-epoll8916930274033685449.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
2016-03-07 22:35:13.993  INFO 4047 --- [           main] o.s.m.s.s.StompBrokerRelayMessageHandler : Started.
2016-03-07 22:35:14.045  INFO 4047 --- [eactor-tcp-io-1] r.io.net.impl.netty.tcp.NettyTcpClient   : CONNECTED: [id: 0x034a269f, /127.0.0.1:39955 => /127.0.0.1:25672]
2016-03-07 22:35:14.151  INFO 4047 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer …
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-boot spring-websocket java-websocket spring-rabbitmq

3
推荐指数
1
解决办法
2311
查看次数

Spring amqp:如何在MessageListenerAdapter中读取MessageProperties

如果我在其签名中添加MessageProperties,handleMessage方法不会从队列中获取消息.如果没有MessageProperties,它可以正常工作.

如何在MessageListenerAdapter的handleMessage中获取MessageProperties?

public class EventMessageAdapter {

  public void handleMessage(MessageProperties messageProperties, Event event)    {
  ...
  String id = messageProperties.getHeaders().get("key");
}
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-amqp spring-rabbitmq

3
推荐指数
1
解决办法
2794
查看次数

Spring-boot-starter RabbitMQ全局错误处理

我使用spring-boot-starter-amqp 1.4.2.Producer和consumer工作正常,但有时传入的JSON消息的语法不正确.这导致以下(正确)异常:

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
Caused by:  org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...
Run Code Online (Sandbox Code Playgroud)

将来我可能面临更多例外.所以我想配置一个全局错误处理程序,这样如果任何一个消费者都有任何异常我可以全局处理它.

注意:在这种情况下,消息根本没有到达消费者.我希望在整个消费者中处理这些类型的异常.

请找到以下代码:

RabbitConfiguration.java

@Configuration
@EnableRabbit
public class RabbitMqConfiguration {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public MessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    @Primary
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

}
Run Code Online (Sandbox Code Playgroud)

消费者

@RabbitListener(
        id = "book_queue",
        bindings = @QueueBinding(
                value = @Queue(value = "book.queue", durable …
Run Code Online (Sandbox Code Playgroud)

rabbitmq spring-rabbit spring-boot rabbitmq-exchange spring-rabbitmq

3
推荐指数
1
解决办法
9297
查看次数

SpringAMQP RabbitMQ如何在没有Exchange的情况下直接发送到Queue

我正在使用带有Rabbit模板的SpringAMQP.如何将邮件直接发送到省略Exchange的队列?我该怎么做?

spring send rabbitmq spring-amqp spring-rabbitmq

3
推荐指数
1
解决办法
2811
查看次数

Rabbit监听器注释从yaml获取队列名称

我目前将我的兔子监听器注释设置为:

@RabbitListener(queues ="my-queue")

是不是可以从我的yaml文件中提取队列名称.我想这样做的原因是,我可以通过更改yaml文件中的队列名称将队列更改为测试队列以进行集成测试.看来注释必须接受一个常量字符串,有没有办法围绕这个?谢谢,

rabbitmq spring-boot spring-rabbitmq

3
推荐指数
1
解决办法
2730
查看次数

SPRINGAMQP或RabbitMq Java API

我是rabbitMq和Spring AMQP的新手.我正在从头开始构建一个新项目.这里有一个组件我们使用rabbit-Mq作为消息代理.

在这个项目中,主要是所有的开发都发生在Java中.我们通常将Spring用于某些组件.

现在,虽然用Erlang编写的rabbit-mq确实提供了一个干净的java Api.还有spring amqp,它提供了一个很好的接口来支持松散耦合(通过AMQPTemplate等).

我认为使用SpringAMQP的一个优点是,由于上面提到的松散耦合,明天如果我们必须使用AMQP的任何其他实现而不是兔子Mq(spring-rabbit)我不必更改我的代码.但正如我所看到的,今天的实施是针对兔子MQ并且除非特殊情况,我在这里看不到任何事情.我们应该在近乎可预见的未来使用rabbit-Mq,也不确定除了rabbitMq之外是否还有其他消息代理实现在spring amqp方面.

我认为缺点是我完全不了解RabbitMq提供的实际客户端API,因为Spring AMQP给出了抽象(除非我决定深入挖掘).

在这种情况下,SPRING AMQP提供的传统rabbitMq Java API还有其他优势吗?

谢谢

rabbitmq spring-rabbit spring-amqp spring-rabbitmq

2
推荐指数
2
解决办法
824
查看次数