使用 RabbitMQ 进行批处理的最佳实践

Shk*_*lar 5 etl rabbitmq python-3.x

我正在寻找使用 Python 执行 ETL 的最佳方法。

我在 RabbitMQ 中有一个发送事件的通道(甚至可以每秒发送一次)。我想处理其中的每 1000 个。主要问题是 RabbitMQ 接口(我正在使用 pika)对每条消息都会引发回调。我研究了 Celery 框架,但是批处理功能在版本 3 中被弃用了。

最好的方法是什么?我考虑将事件保存在列表中,当事件达到 1000 时将其复制到其他列表并执行处理。但是,如何使其线程安全呢?我不想丢失事件,也害怕同步列表时丢失事件。

这听起来像是一个非常简单的用例,但是我没有找到任何好的最佳实践。

men*_*nya 5

如何使其线程安全?

设置消费者怎么样prefetch-count=1000?如果消费者的unack消息达到其预取限制,rabbitmq 将不会向其传递任何消息。

不要ACK收到消息,直到您有 1000 条消息,然后将其复制到其他列表并执行处理。当你的工作完成后,ACK最后一条消息以及这条消息之前的所有消息将由ACKrabbitmq服务器发送。

但我不确定大量预取是否是最佳实践。