我正在使用Pika处理来自RabbitMQ的数据.由于我似乎遇到了不同类型的问题,我决定编写一个小型测试应用程序来查看如何处理断开连接.
我写了这个测试应用程序,其中包括:
我注意到的是2个问题:
这可能是因为网络问题,数据包丢失,但我发现连接不是很强大.
当脚本在RabbitMQ服务器上本地运行并且我杀死RabbitMQ时,脚本退出时出现错误:"ERROR pika SelectConnection:3:104上的套接字错误"
所以看起来我不能让重新连接策略按原样运行.有人可以查看代码,看看我做错了什么?
谢谢,
松鸦
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant …Run Code Online (Sandbox Code Playgroud) 我在rabbitmq上使用芹菜.我已经向队列发送了数千条消息,并且它们正在被成功处理,一切正常.但是,几个rabbitmq队列中的消息数量正在增长(队列中有数十万个项目).队列命名celeryev.[...](见下面的截图).这是恰当的行为吗?这些队列的目的是什么,不应该定期清除它们?有没有办法更频繁地清除它们,我认为它们占用了相当多的磁盘空间.

RabbitMQ .NET客户端是否有任何异步支持?我希望能够异步连接和使用消息,但到目前为止还没有找到办法.
(对于消费消息,我可以使用EventingBasicConsumer,但这不是一个完整的解决方案.)
为了给出一些上下文,这是我现在如何使用RabbitMQ的一个例子(代码取自我的博客):
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume("testqueue", true, consumer);
Console.ReadLine();
}
}
Run Code Online (Sandbox Code Playgroud) 在REST世界中,我们有类似Swagger规范的东西,它完全描述了REST接口边界(客户端和服务器之间)的合同.这些Swagger规范可用于自动生成REST客户端,也可用于为REST API使用者自动生成文档.此外,这些Swagger规范也是CI和版本化API的宝贵资产.
我想知道异步发布订阅世界中是否存在类似的解决方案:让我们说一下RabbitMQ上的典型AMQP消费者/制作人....
最好的祝福,
巴特
我有RabbitMQ集群,生产中有两个节点,集群正在打破这些错误消息:
= ERROR REPORT ==== 23-Dec-2011 ::
04:21:34 ===**节点兔子@ rabbitmq02没有响应**
**删除(超时)连接**= INFO REPORT ==== 23-Dec-2011 ::
04:21:35 === 节点兔子@ rabbitmq02丢失'兔子'= ERROR REPORT ==== 2011年12月23日::
04:21:49 === Mnesia(兔子@ rabbitmq01):**错误**mnesia_event得到{inconsistent_database,running_partitioned_network,rabbit @ rabbitmq02}
我试图通过使用"tcpkill"终止两个节点之间的连接来模拟问题,集群已断开连接,并且令人惊讶的是两个节点没有尝试重新连接!
当群集中断时,haproxy负载均衡器仍然将两个节点标记为活动并向两个节点发送请求,尽管它们不在群集中.
我的问题:
如果节点配置为群集,当我遇到网络故障时,他们为什么不尝试重新连接?
如何识别损坏的集群并关闭其中一个节点?分别使用两个节点时遇到一致性问题.
我是RabbitMQ的新手,并且想知道我正在考虑解决这个问题的好方法.我想创建一个订阅队列的服务,只提取符合特定条件的消息; 例如,如果消息中包含特定主题标题.
我还在学习RabbitMQ,并且正在寻找如何解决这个问题的技巧.我的问题包括:消费者如何仅从队列中提取特定消息?生产者如何在消息中设置主题标题(如果这甚至是正确的术语?)
我创建了一个简单的发布者和使用者在队列上订阅的消费者basic.consume.
我的消费者在作业无异常运行时确认消息.每当我遇到异常时,我都不会收到消息并提前返回.只有已确认的消息才会从队列中消失,因此工作正常.
现在我希望消费者再次接收失败的消息,但重新生成这些消息的唯一方法是重新启动消费者.
我该如何处理这个用例?
设置代码
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName('my-exchange');
$exchange->setType('fanout');
$exchange->declare();
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declare();
$queue->bind('my-exchange');
Run Code Online (Sandbox Code Playgroud)
消费者代码
$queue->consume(array($this, 'callback'));
public function callback(AMQPEnvelope $msg)
{
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return;
}
return $queue->ack($msg->getDeliveryTag());
}
Run Code Online (Sandbox Code Playgroud)
制片人代码
$exchange->publish('message');
Run Code Online (Sandbox Code Playgroud) 在本指南中https://www.rabbitmq.com/api-guide.html RabbitMQ人员说:
通道和并发注意事项(线程安全)
不得在线程之间共享通道实例.应用程序应该更喜欢每个线程使用一个Channel,而不是跨多个线程共享相同的Channel.虽然通道上的某些操作可以安全地同时调用,但有些操作并不会导致错误的帧交错.在线程之间共享通道也会干扰*Publisher Confirms.
线程安全非常重要,所以我尽量努力,但问题是:
我有这个从Rabbit接收消息的应用程序.收到消息后,它会对其进行处理,然后在完成时确认.应用程序可以在具有2个线程的固定线程池中同时处理2个项目.Rabbit的QOS预取设置为2,因为我不想为应用程序提供超过它在时间范围内可以处理的内容.
现在,我的消费者的handleDelivery执行以下操作:
Task run = new Task(JSON.parse(message));
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));
Run Code Online (Sandbox Code Playgroud)
此时,您已经发现TestWrapperThread将该channel.basicAck(deliveryTag, false);调用作为最后一个操作.
通过我对文档的理解,这是不正确的并且可能有害,因为通道不是线程安全的,并且这种行为可能会搞砸了.但那我该怎么做呢?我的意思是,我有一些想法,但他们会把一切都变得更复杂,我想知道它是否真的有必要.
提前致谢
你应该在哪里更新芹菜设置?在远程工作人员或发件人?
例如,我有一个使用Django和Celery的API.API通过代理(RabbitMQ)将远程作业发送给我的远程工作人员.工作者正在运行python脚本(不使用Django),有时这些工作会产生子任务.
我在双方都创建了芹菜设置(发送者和工作者),即他们都需要设置BROKER_URL.但是,假设我要添加设置CELERY_ACKS_LATE = True,最后我将此设置添加到?每个远程工作人员或发件人(API)?
API和远程工作者都连接到同一个Broker,每个都以不同的方式启动芹菜.API通过Django创建芹菜实例__init__.py,工人通过主管启动芹菜,即celery -A tasks worker -l info
我们在我的工作场所有一个围绕RabbitMQ的包装库,由不再在这里工作的人创建.我正在使用Rabbit设计一个新系统,并且正在制定用于声明队列,交换和绑定的最佳方法.我们的Rabbit架构有一些联邦全局区域,每个区域有多个Rabbit节点.
发布消息和订阅队列的包装器代码每次都重新声明相关的交换,队列和绑定.我担心的是,这可能会在每个消息发布中引入显着的延迟,特别是如果它需要等待确认远程全局区域中存在队列/交换.我希望每秒数百万条消息的基准不会重新声明每次发布的交换.
简而言之,这种方法对我来说似乎有点浪费和偏执,但也许我错过了一些东西.
所以我有几个问题: