我的容器XML配置:
<rabbit:listener-container
connection-factory="myConnectionFactory"
acknowledge="none"
concurrency="10"
requeue-rejected="false">
<rabbit:listener ref="myListener" queues="myQueue"/>
</rabbit:listener-container>
Run Code Online (Sandbox Code Playgroud)
而且myListener只是一堂课
@Component("myListener")
public class MyListener implements MessageListener {
@Autowired
SomeDependency dependency;
....
}
Run Code Online (Sandbox Code Playgroud)
我concurrency="10"在我的XML中指定了.这是什么意思完全相同?
我找到了一些文档.他们没有那么有用的陈述:
指定要创建的并发使用者数.默认值为1.
我感兴趣的是是否MyListener必须是线程安全的,即
SomeDependency dependency 一次或为每个线程/实例实例化?dependency需要线程安全吗?我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方.发件人以非常快的方式发送消息.接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入).由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列.所以我的问题是:这会导致消息队列溢出吗?
消息使用者如下所示:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
Run Code Online (Sandbox Code Playgroud)
消费者收到的每条消息都包含一个caseID.对于每个caseID,它会将大量数据保存到数据库中,这需要很长时间.目前只为RabbitMQ设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅caseID.那么如何才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费率吗?或者我应该使用多个消费者同时使用传入消息?或者是否存在任何异步方式让消费者异步使用消息而不等待它完成?欢迎任何建议.
我有一个Spring AMQP消息监听器正在运行.
public class ConsumerService implements MessageListener {
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message) {
try {
testService.process(message); //This process method can throw Business Exception
} catch (BusinessException e) {
//Here we can just log the exception. How the retry attempt is made?
} catch (Exception e) {
//Here we can just log the exception. How the retry attempt is made?
}
}
}
Run Code Online (Sandbox Code Playgroud)
如您所见,在处理过程中可能会出现异常.我想重试因为Catch块中的特定错误.我不能通过onMessage中的异常.如何告诉RabbitMQ有异常并重试?
我们有一个 PHP 应用程序,它通过 WebSocket 连接(PHP AMQP pecl 扩展 v1.7.1 和 RabbitMQ 3.6.6)将消息从 RabbitMQ 转发到连接的设备。
消息从队列数组(每个 websocket 连接 1 个)中消费,当我们通过 websocket 收到消息已收到确认时由消费者确认(因此我们可以重新排队未在可接受的时间范围内交付的消息)。这是以非阻塞方式完成的。
99% 的情况下,这可以完美运行,但偶尔我们会收到错误“RabbitMQ PRECONDITION_FAILED - 未知的交付标签”。这将关闭通道。据我了解,此异常是以下条件之一的结果:
我们已针对上述每种情况实施了保护,但问题仍然存在。
我意识到有许多实现细节可能会影响这一点,但在概念层面上,是否还有其他我们没有考虑过并且应该处理的失败案例?或者有没有更好的方法来实现上述功能?
我在RabbitMQ文档中找不到安装附带的默认x-message-ttl值.
我知道如何将其设置为所需的值,但我很想知道默认值.
我在面向服务的体系结构中使用RabbitMQ作为消息队列,其中许多单独的Web服务发布绑定到RabbitMQ队列的消息.这些队列由各种消费者订阅,这些消费者执行后台工作; RabbitMQ的一个漂亮的香草用例.
现在我想改变一些队列参数(具体来说,我想将队列绑定到一个带有某个路由键的新死信交换).我的问题是,由于几个原因,在生产系统上进行这种改变是有问题的.
对于我来说,转换到这些新队列的最佳方式是什么,而不会丢失生产系统中的消息?
我已经考虑了从版本化队列名称到使用新设置创建新vhost以进行所有更改的所有内容.
以下是我面临的一些问题:
因为RabbitMQ队列是幂等的,所以不同的Web服务在发布之前已经声明了队列(如果它们尚不存在).更改队列参数(但保持相同的路由键)后,队列声明失败,RabbitMQ关闭通道.
我想在更改队列时不丢失消息(这里我打算订阅一个保存消息然后重新发布到新队列的独占消费者).
不同出版商与消费者群体之间的一般协调(或者更好的是,避免需要协调他们的方式).
我在Windows 2008 R2上运行带有Erlang OTP 17.1的RabbitMQ v3.3.5.我的Dev和QA环境是独立的.我的登台和生产环境是集群的.
我发现RabbitMQ服务正在运行时经常出现这个问题,RabbitMQ管理控制台正在查看所有内容,但是当我尝试从命令行运行rabbitmqctl时,它失败并显示节点已关闭的错误(在本地和在远程服务器).
如果我重新启动Windows服务,则会解决此问题.
我在RabbitMQ错误日志中看不到任何错误消息.最后一条消息表明节点已启动.
下面是我最近在登台窗口集群的节点2上遇到的问题的示例输出:
PS C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.3.5\sbin> .\rabbitmqctl.bat status
Status of node rabbit@MYSERVER2 ...
Error: unable to connect to node rabbit@MYSERVER2: nodedown
DIAGNOSTICS
===========
attempted to contact: [rabbit@MYSERVER2]
rabbit@MYSERVER2:
* connected to epmd (port 4369) on MYSERVER2
* epmd reports: node 'rabbit' not running at all
no other nodes on MYSERVER2
* suggestion: start the node
current node details:
- node name: rabbitmqctl2199771@MYSERVER2
- home dir: C:\Users\RabbitMQ
- cookie hash: mn6OaTX9mS4DnZaiOzg8pA==
Run Code Online (Sandbox Code Playgroud)
此时我重新启动RabbitMQ服务,然后再试一次 …
我想知道为什么我们需要routing key将消息路由exchange到队列.我们不能使用简单队列名来路由消息.此外,在发布到多个队列的情况下,我们可以使用多个队列名称.任何人都可以指出我们实际需要路由密钥的场景,队列名称是不够的.
只是为了让事情变得棘手,我想使用来自rabbitMQ队列的消息.现在我知道有一个针对兔子的MQTT插件(https://www.rabbitmq.com/mqtt.html).
但是,我似乎无法在Spark消耗由pika生成的消息的情况下进行示例工作.
例如,我在这里使用简单的wordcount.py程序(https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html),看看我是否可以在下面看到一个消息生产者办法:
import sys
import pika
import json
import future
import pprofile
def sendJson(json):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='analytics', durable=True)
channel.queue_bind(exchange='analytics_exchange',
queue='analytics')
channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
connection.close()
if __name__ == "__main__":
with open(sys.argv[1],'r') as json_file:
sendJson(json_file.read())
Run Code Online (Sandbox Code Playgroud)
Sparkstreaming 消费者如下:
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")
#RabbitMQ
"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE …Run Code Online (Sandbox Code Playgroud) 我的docker-compose看起来像这样:
version: '3.2'
services:
mq:
hostname: ${HOST_NAME}
ports:
- "5671:5671"
- "5672:5672"
- "15671:15671"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=${USER}
- RABBITMQ_DEFAULT_PASS=${RABBITMQ_DEFAULT_PASS}
volumes:
- ${CACERT_PEM_FILE}:/etc/rabbitmq/certs/cacert.pem
- ${CERT_PEM_FILE}:/etc/rabbitmq/certs/cert.pem
- ${KEY_PEM_FILE}:/etc/rabbitmq/certs/key.pem
- ${MQ_CONFIG_FILE}:/etc/rabbitmq/rabbitmq.conf
image: rabbitmq:3-management
Run Code Online (Sandbox Code Playgroud)
我的rabbitmq.conf看起来像这样:
listeners.tcp.default = 5672
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/certs/cacert.pem
ssl_options.certfile = /etc/rabbitmq/certs/cert.pem
ssl_options.keyfile = /etc/rabbitmq/certs/key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false
ssl_options.versions.1 = tlsv1.2
ssl_options.versions.2 = tlsv1.1
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试这样做时,docker-compose up我收到以下错误:
cannot rename /etc/rabbitmq/sedMaHqMa: Device or resource busy
我尝试使用旧格式的配置文件(rabbitmq.config)并且它没有给我这个错误,但是我需要使用新格式,因为我需要在启动期间通过env提供密码.变量.
编辑2018年2月20日
以下是Rabbitmq docker映像中当前可用环境变量的列表,它们足以为AMQP和HTTP(管理API和Web控制台)设置TLS
复制它们以防链接断开:
RABBITMQ_DEFAULT_PASS
RABBITMQ_DEFAULT_USER …Run Code Online (Sandbox Code Playgroud) rabbitmq ×10
amqp ×4
java ×3
pika ×2
rabbitmqctl ×2
spring ×2
apache-spark ×1
bunny ×1
config ×1
erlang ×1
mqtt ×1
php ×1
python ×1
spring-amqp ×1
windows ×1