标签: rabbitmq

如何实现Spring AMQP Listener Container中的并发性?

我的容器XML配置:

<rabbit:listener-container
        connection-factory="myConnectionFactory"
        acknowledge="none"
        concurrency="10"
        requeue-rejected="false">
    <rabbit:listener ref="myListener" queues="myQueue"/>
</rabbit:listener-container>
Run Code Online (Sandbox Code Playgroud)

而且myListener只是一堂课

@Component("myListener")
public class MyListener implements MessageListener {
    @Autowired
    SomeDependency dependency;
    ....
}
Run Code Online (Sandbox Code Playgroud)

concurrency="10"在我的XML中指定了.这是什么意思完全相同


我找到了一些文档.他们没有那么有用的陈述:

指定要创建的并发使用者数.默认值为1.


我感兴趣的是是否MyListener必须是线程安全的,即

  • 有很多实例创建或许多线程使用单个实例?
  • 我可以访问没有同步的实例字段吗?
  • SomeDependency dependency 一次或为每个线程/实例实例化?
  • 确实dependency需要线程安全吗?

java spring amqp rabbitmq spring-amqp

16
推荐指数
1
解决办法
1万
查看次数

RabbitMQ:快速生产者和缓慢的消费者

我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方.发件人以非常快的方式发送消息.接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入).由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列.所以我的问题是:这会导致消息队列溢出吗?

消息使用者如下所示:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}
Run Code Online (Sandbox Code Playgroud)

消费者收到的每条消息都包含一个caseID.对于每个caseID,它会将大量数据保存到数据库中,这需要很长时间.目前只为RabbitMQ设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅caseID.那么如何才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费率吗?或者我应该使用多个消费者同时使用传入消息?或者是否存在任何异步方式让消费者异步使用消息而不等待它完成?欢迎任何建议.

java multithreading producer-consumer amqp rabbitmq

16
推荐指数
2
解决办法
1万
查看次数

如何在Spring异步MessageListener用例中发生业务异常时请求RabbitMQ重试

我有一个Spring AMQP消息监听器正在运行.

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

如您所见,在处理过程中可能会出现异常.我想重试因为Catch块中的特定错误.我不能通过onMessage中的异常.如何告诉RabbitMQ有异常并重试?

java spring rabbitmq

16
推荐指数
1
解决办法
9052
查看次数

RabbitMQ PRECONDITION_FAILED - 未知的交付标签

我们有一个 PHP 应用程序,它通过 WebSocket 连接(PHP AMQP pecl 扩展 v1.7.1 和 RabbitMQ 3.6.6)将消息从 RabbitMQ 转发到连接的设备。

消息从队列数组(每个 websocket 连接 1 个)中消费,当我们通过 websocket 收到消息已收到确认时由消费者确认(因此我们可以重新排队未在可接受的时间范围内交付的消息)。这是以非阻塞方式完成的。

99% 的情况下,这可以完美运行,但偶尔我们会收到错误“RabbitMQ PRECONDITION_FAILED - 未知的交付标签”。这将关闭通道。据我了解,此异常是以下条件之一的结果:

  1. 消息被确认或拒绝。
  2. 尝试通过消息未通过的通道进行确认。
  3. 在消息超时 (ttl) 到期后尝试确认。

我们已针对上述每种情况实施了保护,但问题仍然存在。

我意识到有许多实现细节可能会影响这一点,但在概念层面上,是否还有其他我们没有考虑过并且应该处理的失败案例?或者有没有更好的方法来实现上述功能?

php amqp rabbitmq

16
推荐指数
3
解决办法
2万
查看次数

RabbitMQ:什么是默认的x-message-ttl值

我在RabbitMQ文档中找不到安装附带的默认x-message-ttl值.

我知道如何将其设置为所需的值,但我很想知道默认值.

amqp rabbitmq

15
推荐指数
1
解决办法
1万
查看次数

RabbitMQ在生产系统上更改队列参数

我在面向服务的体系结构中使用RabbitMQ作为消息队列,其中许多单独的Web服务发布绑定到RabbitMQ队列的消息.这些队列由各种消费者订阅,这些消费者执行后台工作; RabbitMQ的一个漂亮的香草用例.

现在我想改变一些队列参数(具体来说,我想将队列绑定到一个带有某个路由键的新死信交换).我的问题是,由于几个原因,在生产系统上进行这种改变是有问题的.

对于我来说,转换到这些新队列的最佳方式是什么,而不会丢失生产系统中的消息?

我已经考虑了从版本化队列名称到使用新设置创建新vhost以进行所有更改的所有内容.

以下是我面临的一些问题:

  1. 因为RabbitMQ队列是幂等的,所以不同的Web服务在发布之前已经声明了队列(如果它们尚不存在).更改队列参数(但保持相同的路由键)后,队列声明失败,RabbitMQ关闭通道.

  2. 我想在更改队列时不丢失消息(这里我打算订阅一个保存消息然后重新发布到新队列的独占消费者).

  3. 不同出版商与消费者群体之间的一般协调(或者更好的是,避免需要协调他们的方式).

rabbitmq pika bunny

15
推荐指数
1
解决办法
9927
查看次数

rabbitmqctl错误:无法连接到节点rabbit @ myserver nodedown

我在Windows 2008 R2上运行带有Erlang OTP 17.1的RabbitMQ v3.3.5.我的Dev和QA环境是独立的.我的登台和生产环境是集群的.

我发现RabbitMQ服务正在运行时经常出现这个问题,RabbitMQ管理控制台正在查看所有内容,但是当我尝试从命令行运行rabbitmqctl时,它失败并显示节点已关闭的错误(在本地和在远程服务器).

如果我重新启动Windows服务,则会解决此问题.

我在RabbitMQ错误日志中看不到任何错误消息.最后一条消息表明节点已启动.

下面是我最近在登台窗口集群的节点2上遇到的问题的示例输出:

PS C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.3.5\sbin> .\rabbitmqctl.bat status
Status of node rabbit@MYSERVER2 ...
Error: unable to connect to node rabbit@MYSERVER2: nodedown

DIAGNOSTICS
===========

attempted to contact: [rabbit@MYSERVER2]

rabbit@MYSERVER2:
  * connected to epmd (port 4369) on MYSERVER2
  * epmd reports: node 'rabbit' not running at all
                  no other nodes on MYSERVER2
  * suggestion: start the node

current node details:
- node name: rabbitmqctl2199771@MYSERVER2
- home dir: C:\Users\RabbitMQ
- cookie hash: mn6OaTX9mS4DnZaiOzg8pA==
Run Code Online (Sandbox Code Playgroud)

此时我重新启动RabbitMQ服务,然后再试一次 …

windows erlang rabbitmq rabbitmqctl

15
推荐指数
1
解决办法
4万
查看次数

为什么我们需要在rabbitmq中使用路由键

我想知道为什么我们需要routing key将消息路由exchange到队列.我们不能使用简单队列名来路由消息.此外,在发布到多个队列的情况下,我们可以使用多个队列名称.任何人都可以指出我们实际需要路由密钥的场景,队列名称是不够的.

rabbitmq rabbitmq-exchange rabbitmqctl

15
推荐指数
2
解决办法
1万
查看次数

使用pika的python中的SparkStreaming,RabbitMQ和MQTT

只是为了让事情变得棘手,我想使用来自rabbitMQ队列的消息.现在我知道有一个针对兔子的MQTT插件(https://www.rabbitmq.com/mqtt.html).

但是,我似乎无法在Spark消耗由pika生成的消息的情况下进行示例工作.

例如,我在这里使用简单的wordcount.py程序(https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html),看看我是否可以在下面看到一个消息生产者办法:

import sys
import pika
import json
import future
import pprofile

def sendJson(json):

  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  channel = connection.channel()

  channel.queue_declare(queue='analytics', durable=True)
  channel.queue_bind(exchange='analytics_exchange',
                       queue='analytics')

  channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
  connection.close()

if __name__ == "__main__":
  with open(sys.argv[1],'r') as json_file:
    sendJson(json_file.read())
Run Code Online (Sandbox Code Playgroud)

Sparkstreaming 消费者如下:

import sys
import operator

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")


#RabbitMQ

"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE …
Run Code Online (Sandbox Code Playgroud)

python rabbitmq mqtt pika apache-spark

15
推荐指数
1
解决办法
2764
查看次数

在docker-compose文件中提供rabbitmq.conf会出现"sed:无法重命名/ etc/rabbitmq/sedMaHqMa:设备或资源繁忙"

我的docker-compose看起来像这样:

version: '3.2'
services:
  mq:
    hostname: ${HOST_NAME}
    ports:
      - "5671:5671"
      - "5672:5672"
      - "15671:15671"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=${USER}
      - RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
    volumes:
      - ${CACERT_PEM_FILE}:/etc/rabbitmq/certs/cacert.pem
      - ${CERT_PEM_FILE}:/etc/rabbitmq/certs/cert.pem
      - ${KEY_PEM_FILE}:/etc/rabbitmq/certs/key.pem
      - ${MQ_CONFIG_FILE}:/etc/rabbitmq/rabbitmq.conf
    image: rabbitmq:3-management
Run Code Online (Sandbox Code Playgroud)

我的rabbitmq.conf看起来像这样:

listeners.tcp.default = 5672
listeners.ssl.default = 5671

ssl_options.cacertfile = /etc/rabbitmq/certs/cacert.pem
ssl_options.certfile = /etc/rabbitmq/certs/cert.pem
ssl_options.keyfile = /etc/rabbitmq/certs/key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false

ssl_options.versions.1 = tlsv1.2
ssl_options.versions.2 = tlsv1.1
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试这样做时,docker-compose up我收到以下错误:

cannot rename /etc/rabbitmq/sedMaHqMa: Device or resource busy

我尝试使用旧格式的配置文件(rabbitmq.config)并且它没有给我这个错误,但是我需要使用新格式,因为我需要在启动期间通过env提供密码.变量.

编辑2018年2月20日

以下是Rabbitmq docker映像中当前可用环境变量的列表,它们足以为AMQP和HTTP(管理API和Web控制台)设置TLS

复制它们以防链接断开:

RABBITMQ_DEFAULT_PASS
RABBITMQ_DEFAULT_USER …
Run Code Online (Sandbox Code Playgroud)

config rabbitmq docker-compose

15
推荐指数
1
解决办法
1910
查看次数