RabbitMQ java客户端停止消费消息

Alk*_*ris 5 java heroku rabbitmq spring-rabbit spring-boot

我的应用程序使用来自 RabbitMQ 的一些消息并处理它们。我有大约 10 个队列,每个队列最多有 10 个消费者(线程)。我的预取为 5。我使用 CloudAMQP 插件(RabbitMQ 作为服务)在 Heroku 中运行我的设置。

我正在使用默认的心跳和连接超时设置(60 秒)运行。

我的java应用程序是一个使用spring-rabbit库的spring boot应用程序。

版本:

RabbitMQ 3.5.3 
Erlang 17.5.3
Java 1.8
Spring boot 1.3.2.RELEASE
Spring rabbit 1.5.3.RELEASE
Run Code Online (Sandbox Code Playgroud)

问题在于,对于一​​个特定队列的消费者来说,在一段时间后停止消费消息。当我重新启动我的 java 应用程序时,一切正常。但其他队列正在正常消耗。应用程序端没有错误。在兔子这边的日志流中,我看到一些条目,例如

= REPORT==== 2016-08-02 15:53:32 UTC ===
closing AMQP connection <SOMETHING> (SOMETHING_ELSE -> SOMETHING_ELSE_ELSE):
{heartbeat_timeout,running}
Run Code Online (Sandbox Code Playgroud)

我无法在本地或 Heroku 的测试环境中重现。

更新

下面的代码可以在AMQConnection.class

int heartbeat = negotiatedMaxValue(this.requestedHeartbeat,
                                   connTune.getHeartbeat());


private static int negotiatedMaxValue(int clientValue, int serverValue) {
        return (clientValue == 0 || serverValue == 0) ?
            Math.max(clientValue, serverValue) :
            Math.min(clientValue, serverValue);
}
Run Code Online (Sandbox Code Playgroud)

我无法将心跳值增加到 60 秒以上(这是我从服务器获得的值)。

rde*_*ges 4

不幸的是,这似乎是一个网络问题。这可能是由于以下几个原因造成的:

  • CloudAMQP 服务出现一些问题并正在终止您的连接(不太可能,因为您的其他使用者工作正常)。
  • 您的 CloudAMQP 计划不允许您想要的并发连接数。您是否检查过以确保您的计划足够高以支持所有消费者?https://elements.heroku.com/addons/cloudamqp
  • 您与相关消费者的 Heroku dyno 正在重新启动,这会断开您的连接。Heroku dynos 定期重新启动。如果您的测功机无法正常重启,您可能需要调查原因。
  • 您的 Heroku dynos 之一出现网络问题(在这种情况下,它可能会在无需您干预的情况下自行重新启动)。

强制所有测功机重新启动的一种方法是运行$ heroku ps:restart. 这将迫使 Heroku 重新启动您的 dynos,这通常意味着将它们移动到新的 EC2 主机。如果这是一次性问题,这可能会有所帮助。