标签: kombu

使用具有多个线程的rabbitmq消息队列(Python Kombu)

我有一个单一队列的RabbitMQ交换.我希望创建一个运行多个线程的守护进程,并尽快通过此队列.

"工作"涉及与外部服务进行通信,因此每个消费者中都会有相当多的阻塞.因此,我希望有多个线程处理来自同一队列的消息.

我可以通过在我的主线程上使用队列来实现这一点,然后将传入的工作转移到其他线程池中,但是有没有办法在每个线程的上下文中启动多个使用者?

python multithreading rabbitmq kombu

7
推荐指数
1
解决办法
3843
查看次数

RabbitMQ心跳与连接排除事件超时


我有一个rabbitmq服务器和一个使用kombu的am​​qp使用者(python).
我已将我的应用程序安装在具有防火墙的系统中,该防火墙在1小时后关闭空闲连接.
这是我的amqp_consumer.py:

try:
    # connections
    with Connection(self.broker_url, ssl=_ssl, heartbeat=self.heartbeat) as conn:
        chan = conn.channel()
        # more stuff here
        with conn.Consumer(queue, callbacks = [messageHandler], channel = chan):
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

except Exception as e:
    # do stuff
Run Code Online (Sandbox Code Playgroud)

我想要的是,如果防火墙关闭了连接,那么我想重新连接.我应该使用heartbeat参数还是应该将超时参数(3600秒)传递给drain_events()函数?
两种选择有什么区别?(似乎做同样的事).
谢谢.

python timeout heartbeat rabbitmq kombu

7
推荐指数
1
解决办法
981
查看次数

'排队'教程和文档?

我正在寻找能够概述"排队"的文章和参考文献(我可能在这里没有使用正确的术语).我希望通过Redis,RabbitMQ,Celery,Kombu以及其他任何我尚未阅读的组件,以及它们如何组合在一起的介绍性风格指南.

我的问题是我需要排队由我的Django网站发布的后台任务,我阅读的每篇博客和文章都推荐不同的解决方案.

rabbitmq task-queue redis celery kombu

6
推荐指数
1
解决办法
680
查看次数

Kombu以非阻塞的方式

我正在寻找一种方法来使用kombu作为tornado-sockjs和Django应用服务器之间的MQ适配器.我做了类似的事情:

class BrokerClient(ConsumerMixin):
    clients = []

    def __init__(self):
        self.connection = BrokerConnection(settings.BROKER_URL)
        self.io_loop = ioloop.IOLoop.instance()
        self.queue = sockjs_queue
        self._handle_loop()

    @staticmethod
    def instance():
        if not hasattr(BrokerClient, '_instance'):
            BrokerClient._instance = BrokerClient()
        return BrokerClient._instance

    def add_client(self, client):
        self.clients.append(client)

    def remove_client(self, client):
        self.clients.remove(client)

    def _handle_loop(self):
        try:
            if self.restart_limit.can_consume(1):
                for _ in self.consume(limit=5):
                    pass
        except self.connection.connection_errors:
            print ('Connection to broker lost. '
             'Trying to re-establish the connection...')
        self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop)

    def get_consumers(self, Consumer, channel):
        return [Consumer([self.queue, ], callbacks=[self.process_task])]

    def process_task(self, body, message):
        for client in self.clients:
            if …
Run Code Online (Sandbox Code Playgroud)

python tornado kombu

6
推荐指数
1
解决办法
3189
查看次数

使用kombu重试发布消息的最佳方法是什么?

我正在测试kombu的工作原理.我打算在几个项目中取代鼠兔.我看到kombu有很多文档,但使用我在文档中找到的一些消息丢失了.这是代码:

from kombu import Connection, Producer
conn = Connection('amqp://localhost:5672')
def errback(exc, interval):
     logger.error('Error: %r', exc, exc_info=1)
     logger.info('Retry in %s seconds.', interval)
producer = Producer(conn)
publish = conn.ensure(producer, producer.publish, errback=errback, max_retries=3)
for i in range(1, 200000):
   publish({'hello': 'world'}, routing_key='test_queue')
   time.sleep(0.001)
Run Code Online (Sandbox Code Playgroud)

当它发布时我多次关闭连接并继续发布但在队列中有大约60000条消息,所以有很多丢失的消息.

我尝试了不同的替代品,例如:

publish({'hello': 'world'}, retry=True, mandatory=True, routing_key='hipri')
Run Code Online (Sandbox Code Playgroud)

谢谢!

python kombu

6
推荐指数
1
解决办法
1888
查看次数

Python:Kombu + RabbitMQ死锁 - 队列被阻止或阻塞

问题

我有一个RabbitMQ服务器,作为我的一个系统的队列中心.在过去一周左右,它的制作人每隔几个小时就会完全停止.

我试过了什么

蛮力

  • 停止消费者会释放锁定几分钟,但随后阻止返回.
  • 重启RabbitMQ解决了几个小时的问题.
  • 我有一些自动脚本可以完成丑陋的重启,但显然远非正确的解决方案.

分配更多内存

cantSleepNow的回答之后,我将分配给RabbitMQ内存增加到90%.服务器有16GB的内存,消息数量不是很高(每天数百万),所以这似乎不是问题.

从命令行:

sudo rabbitmqctl set_vm_memory_high_watermark 0.9
Run Code Online (Sandbox Code Playgroud)

并与/etc/rabbitmq/rabbitmq.config:

[
   {rabbit,
   [
     {loopback_users, []},
     {vm_memory_high_watermark, 0.9}
   ]
   }
].
Run Code Online (Sandbox Code Playgroud)

代码与设计

我为所有消费者和生产者使用Python.

生产者

生产者是提供呼叫的API服务器.每当呼叫到达时,都会打开一个连接,发送一条消息并关闭连接.

from kombu import Connection

def send_message_to_queue(host, port, queue_name, message):
    """Sends a single message to the queue."""
    with Connection('amqp://guest:guest@%s:%s//' % (host, port)) as conn:
        simple_queue = conn.SimpleQueue(name=queue_name, no_ack=True)
        simple_queue.put(message)
        simple_queue.close()
Run Code Online (Sandbox Code Playgroud)

消费者

消费者彼此略有不同,但通常使用以下模式 - 打开连接,并等待消息到达.连接可以长时间保持打开状态(比如几天).

with Connection('amqp://whatever:whatever@whatever:whatever//') as conn:
    while True:
        queue = …
Run Code Online (Sandbox Code Playgroud)

python deadlock rabbitmq blockingqueue kombu

6
推荐指数
1
解决办法
1063
查看次数

在 pytest 中模拟连接类

我有一个继承的类,kombu.ConsumerProducerMixin我想在没有实际运行的 rabbitmq 服务的情况下对其进行测试。

class Aggregator(ConsumerProducerMixin):

    def __init__(self, broker_url):
        exchange_name = 'chargers'
        self.status = 0
        self.connection = Connection(broker_url)
        ...
Run Code Online (Sandbox Code Playgroud)

在我的测试文件中,我执行了以下操作:

from unittest.mock import Mock, patch

from aggregator import Aggregator

@patch('kombu.connection.Connection')
def test_on_request(conn_mock):

    agg = Aggregator('localhost')
    m = Message("", {"action": "start"}, content_type="application/json")
Run Code Online (Sandbox Code Playgroud)

进入Aggregator.__init__调试器,我看到它connection仍然没有被修补为一个Mock实例:

(Pdb) self.connection
<Connection: amqp://guest:**@localhost:5672// at 0x7fc8b7f636d8>
(Pdb) Connection
<class 'kombu.connection.Connection'>
Run Code Online (Sandbox Code Playgroud)

我的问题是如何正确修补连接,以便我不需要rabbitmq 来运行测试?

python unit-testing mocking rabbitmq kombu

6
推荐指数
1
解决办法
1379
查看次数

来自UI的气流运行任务,KeyError:没有这样的传输

与芹菜相关的气流cfg设置有:

broker_url = 'amqp://guest:guest@rabbitmq_server:8080'
celery_result_backend = db+postgresql://developer:password@postgres_server:5432/db_name
Run Code Online (Sandbox Code Playgroud)

airflow webserver运行正常,但在运行从气流UI任务我得到的错误. 从ui运行任务时出错

运行气流调度程序时出错,tracecak是:

 Traceback (most recent call last):
 File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
 File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, …
Run Code Online (Sandbox Code Playgroud)

celery kombu airflow apache-airflow

6
推荐指数
1
解决办法
952
查看次数

Kombu+RabbitMQ:检查队列是否为空

建筑学

考虑一个带有数据库记录的系统。每条记录可以处于 aliveexpired状态;live应使用外部软件模块定期处理记录。

我使用 Kombu 和 RabbitMQ 的经典生产者-消费者架构解决了这个问题。生产者每隔几秒从数据库中获取记录,然后消费者处理它们。

在此输入图像描述

问题

事件的数量live差异很大,在高峰时段,消费者无法处理负载,并且队列被数千个项目堵塞。

我想让系统具有适应性,这样如果队列为空,生产者就不会向消费者发送新事件。

我尝试过什么

  • 搜索 Kombu 文档/API
  • 检查队列对象
  • 使用 RabbitMQ REST API:http://<host>:<port/api/queues/<vhost>/<queue_name>. 它有效,但它是另一种需要维护的机制,我更喜欢 Kombu 中的优雅解决方案。

如何使用 Python 的 Kombu 检查 RabbitMQ 是否为空?

python rabbitmq kombu

5
推荐指数
1
解决办法
3223
查看次数

Celery RabbitMQ 代理故障转移连接问题

我的集群中有 3 个处于 HA 模式的 RabbitMQ 节点。每个节点都位于单独的 Docker 容器上。

我正在使用 Celery 版本 4 和 kombu 版本 4。

我使用此命令来设置 HA 策略:

rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
Run Code Online (Sandbox Code Playgroud)

芹菜配置如下所示:

CELERY = dict(
    broker_url=[
        'amqp://guest@rabbitmq1:5672',
        'amqp://guest@rabbitmq2:5672',
        'amqp://guest@rabbitmq3:5672',
    ],
   celery_queue_ha_policy='all',
   ...
)
Run Code Online (Sandbox Code Playgroud)

一切正常,直到我停止主 RabbitMQ 应用程序以便使用命令测试 Celery 故障转移功能:

rabbitmqctl stop_app
Run Code Online (Sandbox Code Playgroud)

RabbitMQ 应用程序停止后,我开始在下面的日志中看到错误。日志消息的频率非常高,并且不会随着尝试次数而减慢。

根据日志,Celery 尝试使用下一次故障转移重新连接,但它被另一次重新连接到已停止的主节点的尝试中断。同样的事情一遍又一遍地发生,就像无限循环一样。

[2017-03-17 15:10:28,084: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbitmq1:5672//: [Errno 111] Connection refused.
Will retry using next failover.

[2017-03-17 15:10:28,300: DEBUG/MainProcess] Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See …
Run Code Online (Sandbox Code Playgroud)

failover rabbitmq celery kombu

5
推荐指数
1
解决办法
4214
查看次数