芹菜作为网络发布/子事件

Sam*_*ill 11 python publish-subscribe celery

我想建立一个网络发布/子事件系统,但也需要能够异步运行任务.我试过让芹菜去做繁重的工作,但我觉得我正在试图填充一大堆东西只是为了让它起作用.

我有两台机器(输入和输出),他们都可以访问RabbitMQ.我想让一个主程序启动一个等待输入的循环(由网络摄像头检测到的移动).我设置了input_machine启动main.py,它启动一个芹菜任务,该任务由input_machine子工作站上的工作人员监视到"输入"队列.这个任务只运行一次True循环,直到检测到一些输入,然后它调用另一个名为('project.entered_room',什么都不做)芹菜任务到"输出"队列.

同时在output_machine上,我有一个芹菜实例正在观察"输出"队列,其中一个名为('project.entered_room'的任务响应进入房间的人).

因此,当在input_machine上检测到输入时,任务在输出机器上运行.我可以让这个工作,但遇到很多导入问题和其他令人头疼的问题.有没有更简单的方法来实现这一目标?我错了吗?我使用的是错误的工具吗?

我已经研究了许多不同的框架,包括电路和扭曲.扭曲是非常复杂的,我觉得我会用手提钻击打钉子.

小智 7

我建议跳过 Celery 并直接使用Redis及其 pub/sub 功能。例如,您可以通过运行Docker 映像来启动 Redis 。然后,在您的输入机器上,当检测到某些内容时,您会将消息发布到频道。在输出机器上,您订阅该通道并对事件采取行动。

例如,您的输入机可以使用如下内容:

import redis

def publish(message):
    r = redis.Redis(host="redis")
    r.publish("test-channel", message)
Run Code Online (Sandbox Code Playgroud)

然后在输出端:

import time
import redis

def main():
    r = redis.Redis(host="redis", decode_responses=True)
    p = r.pubsub(ignore_subscribe_messages=True)
    p.subscribe("test-channel")

    while True:
        message = p.get_message()
        if message:
            print(message.get("data", ""))
            # Do more things...
        time.sleep(0.001)
Run Code Online (Sandbox Code Playgroud)

通过这种方式,您可以在输入和输出机器之间发送纯文本或 JSON 数据。

在这里找到示例实现:https ://github.com/moritz-biersack/simple-async-pub-sub