我有一个单一队列的RabbitMQ交换.我希望创建一个运行多个线程的守护进程,并尽快通过此队列.
"工作"涉及与外部服务进行通信,因此每个消费者中都会有相当多的阻塞.因此,我希望有多个线程处理来自同一队列的消息.
我可以通过在我的主线程上使用队列来实现这一点,然后将传入的工作转移到其他线程池中,但是有没有办法在每个线程的上下文中启动多个使用者?
我有一个rabbitmq服务器和一个使用kombu的amqp使用者(python).
我已将我的应用程序安装在具有防火墙的系统中,该防火墙在1小时后关闭空闲连接.
这是我的amqp_consumer.py:
try:
# connections
with Connection(self.broker_url, ssl=_ssl, heartbeat=self.heartbeat) as conn:
chan = conn.channel()
# more stuff here
with conn.Consumer(queue, callbacks = [messageHandler], channel = chan):
# Process messages and handle events on all channels
while True:
conn.drain_events()
except Exception as e:
# do stuff
Run Code Online (Sandbox Code Playgroud)
我想要的是,如果防火墙关闭了连接,那么我想重新连接.我应该使用heartbeat参数还是应该将超时参数(3600秒)传递给drain_events()
函数?
两种选择有什么区别?(似乎做同样的事).
谢谢.
我正在寻找能够概述"排队"的文章和参考文献(我可能在这里没有使用正确的术语).我希望通过Redis,RabbitMQ,Celery,Kombu以及其他任何我尚未阅读的组件,以及它们如何组合在一起的介绍性风格指南.
我的问题是我需要排队由我的Django网站发布的后台任务,我阅读的每篇博客和文章都推荐不同的解决方案.
我正在寻找一种方法来使用kombu作为tornado-sockjs和Django应用服务器之间的MQ适配器.我做了类似的事情:
class BrokerClient(ConsumerMixin):
clients = []
def __init__(self):
self.connection = BrokerConnection(settings.BROKER_URL)
self.io_loop = ioloop.IOLoop.instance()
self.queue = sockjs_queue
self._handle_loop()
@staticmethod
def instance():
if not hasattr(BrokerClient, '_instance'):
BrokerClient._instance = BrokerClient()
return BrokerClient._instance
def add_client(self, client):
self.clients.append(client)
def remove_client(self, client):
self.clients.remove(client)
def _handle_loop(self):
try:
if self.restart_limit.can_consume(1):
for _ in self.consume(limit=5):
pass
except self.connection.connection_errors:
print ('Connection to broker lost. '
'Trying to re-establish the connection...')
self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)
def get_consumers(self, Consumer, channel):
return [Consumer([self.queue, ], callbacks=[self.process_task])]
def process_task(self, body, message):
for client in self.clients:
if …
Run Code Online (Sandbox Code Playgroud) 我正在测试kombu的工作原理.我打算在几个项目中取代鼠兔.我看到kombu有很多文档,但使用我在文档中找到的一些消息丢失了.这是代码:
from kombu import Connection, Producer
conn = Connection('amqp://localhost:5672')
def errback(exc, interval):
logger.error('Error: %r', exc, exc_info=1)
logger.info('Retry in %s seconds.', interval)
producer = Producer(conn)
publish = conn.ensure(producer, producer.publish, errback=errback, max_retries=3)
for i in range(1, 200000):
publish({'hello': 'world'}, routing_key='test_queue')
time.sleep(0.001)
Run Code Online (Sandbox Code Playgroud)
当它发布时我多次关闭连接并继续发布但在队列中有大约60000条消息,所以有很多丢失的消息.
我尝试了不同的替代品,例如:
publish({'hello': 'world'}, retry=True, mandatory=True, routing_key='hipri')
Run Code Online (Sandbox Code Playgroud)
谢谢!
我有一个RabbitMQ服务器,作为我的一个系统的队列中心.在过去一周左右,它的制作人每隔几个小时就会完全停止.
在cantSleepNow的回答之后,我将分配给RabbitMQ的内存增加到90%.服务器有16GB的内存,消息数量不是很高(每天数百万),所以这似乎不是问题.
从命令行:
sudo rabbitmqctl set_vm_memory_high_watermark 0.9
Run Code Online (Sandbox Code Playgroud)
并与/etc/rabbitmq/rabbitmq.config
:
[
{rabbit,
[
{loopback_users, []},
{vm_memory_high_watermark, 0.9}
]
}
].
Run Code Online (Sandbox Code Playgroud)
我为所有消费者和生产者使用Python.
生产者是提供呼叫的API服务器.每当呼叫到达时,都会打开一个连接,发送一条消息并关闭连接.
from kombu import Connection
def send_message_to_queue(host, port, queue_name, message):
"""Sends a single message to the queue."""
with Connection('amqp://guest:guest@%s:%s//' % (host, port)) as conn:
simple_queue = conn.SimpleQueue(name=queue_name, no_ack=True)
simple_queue.put(message)
simple_queue.close()
Run Code Online (Sandbox Code Playgroud)
消费者彼此略有不同,但通常使用以下模式 - 打开连接,并等待消息到达.连接可以长时间保持打开状态(比如几天).
with Connection('amqp://whatever:whatever@whatever:whatever//') as conn:
while True:
queue = …
Run Code Online (Sandbox Code Playgroud) 我有一个继承的类,kombu.ConsumerProducerMixin
我想在没有实际运行的 rabbitmq 服务的情况下对其进行测试。
class Aggregator(ConsumerProducerMixin):
def __init__(self, broker_url):
exchange_name = 'chargers'
self.status = 0
self.connection = Connection(broker_url)
...
Run Code Online (Sandbox Code Playgroud)
在我的测试文件中,我执行了以下操作:
from unittest.mock import Mock, patch
from aggregator import Aggregator
@patch('kombu.connection.Connection')
def test_on_request(conn_mock):
agg = Aggregator('localhost')
m = Message("", {"action": "start"}, content_type="application/json")
Run Code Online (Sandbox Code Playgroud)
进入Aggregator.__init__
调试器,我看到它connection
仍然没有被修补为一个Mock
实例:
(Pdb) self.connection
<Connection: amqp://guest:**@localhost:5672// at 0x7fc8b7f636d8>
(Pdb) Connection
<class 'kombu.connection.Connection'>
Run Code Online (Sandbox Code Playgroud)
我的问题是如何正确修补连接,以便我不需要rabbitmq 来运行测试?
与芹菜相关的气流cfg设置有:
broker_url = 'amqp://guest:guest@rabbitmq_server:8080'
celery_result_backend = db+postgresql://developer:password@postgres_server:5432/db_name
Run Code Online (Sandbox Code Playgroud)
该airflow webserver
运行正常,但在运行从气流UI任务我得到的错误.
运行气流调度程序时出错,tracecak是:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, …
Run Code Online (Sandbox Code Playgroud) 考虑一个带有数据库记录的系统。每条记录可以处于 alive
或expired
状态;live
应使用外部软件模块定期处理记录。
我使用 Kombu 和 RabbitMQ 的经典生产者-消费者架构解决了这个问题。生产者每隔几秒从数据库中获取记录,然后消费者处理它们。
事件的数量live
差异很大,在高峰时段,消费者无法处理负载,并且队列被数千个项目堵塞。
我想让系统具有适应性,这样如果队列为空,生产者就不会向消费者发送新事件。
http://<host>:<port/api/queues/<vhost>/<queue_name>
. 它有效,但它是另一种需要维护的机制,我更喜欢 Kombu 中的优雅解决方案。如何使用 Python 的 Kombu 检查 RabbitMQ 是否为空?
我的集群中有 3 个处于 HA 模式的 RabbitMQ 节点。每个节点都位于单独的 Docker 容器上。
我正在使用 Celery 版本 4 和 kombu 版本 4。
我使用此命令来设置 HA 策略:
rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Run Code Online (Sandbox Code Playgroud)
芹菜配置如下所示:
CELERY = dict(
broker_url=[
'amqp://guest@rabbitmq1:5672',
'amqp://guest@rabbitmq2:5672',
'amqp://guest@rabbitmq3:5672',
],
celery_queue_ha_policy='all',
...
)
Run Code Online (Sandbox Code Playgroud)
一切正常,直到我停止主 RabbitMQ 应用程序以便使用命令测试 Celery 故障转移功能:
rabbitmqctl stop_app
Run Code Online (Sandbox Code Playgroud)
RabbitMQ 应用程序停止后,我开始在下面的日志中看到错误。日志消息的频率非常高,并且不会随着尝试次数而减慢。
根据日志,Celery 尝试使用下一次故障转移重新连接,但它被另一次重新连接到已停止的主节点的尝试中断。同样的事情一遍又一遍地发生,就像无限循环一样。
[2017-03-17 15:10:28,084: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.
[2017-03-17 15:10:28,300: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL. See …
Run Code Online (Sandbox Code Playgroud)