Sam*_*ill 11 python publish-subscribe celery
我想建立一个网络发布/子事件系统,但也需要能够异步运行任务.我试过让芹菜去做繁重的工作,但我觉得我正在试图填充一大堆东西只是为了让它起作用.
我有两台机器(输入和输出),他们都可以访问RabbitMQ.我想让一个主程序启动一个等待输入的循环(由网络摄像头检测到的移动).我设置了input_machine启动main.py,它启动一个芹菜任务,该任务由input_machine子工作站上的工作人员监视到"输入"队列.这个任务只运行一次True循环,直到检测到一些输入,然后它调用另一个名为('project.entered_room',什么都不做)芹菜任务到"输出"队列.
同时在output_machine上,我有一个芹菜实例正在观察"输出"队列,其中一个名为('project.entered_room'的任务响应进入房间的人).
因此,当在input_machine上检测到输入时,任务在输出机器上运行.我可以让这个工作,但遇到很多导入问题和其他令人头疼的问题.有没有更简单的方法来实现这一目标?我错了吗?我使用的是错误的工具吗?
我已经研究了许多不同的框架,包括电路和扭曲.扭曲是非常复杂的,我觉得我会用手提钻击打钉子.
小智 7
我建议跳过 Celery 并直接使用Redis及其 pub/sub 功能。例如,您可以通过运行Docker 映像来启动 Redis 。然后,在您的输入机器上,当检测到某些内容时,您会将消息发布到频道。在输出机器上,您订阅该通道并对事件采取行动。
例如,您的输入机可以使用如下内容:
import redis
def publish(message):
r = redis.Redis(host="redis")
r.publish("test-channel", message)
Run Code Online (Sandbox Code Playgroud)
然后在输出端:
import time
import redis
def main():
r = redis.Redis(host="redis", decode_responses=True)
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe("test-channel")
while True:
message = p.get_message()
if message:
print(message.get("data", ""))
# Do more things...
time.sleep(0.001)
Run Code Online (Sandbox Code Playgroud)
通过这种方式,您可以在输入和输出机器之间发送纯文本或 JSON 数据。
在这里找到示例实现:https ://github.com/moritz-biersack/simple-async-pub-sub