我很惊讶这并没有真正被详细询问,但由于某种原因,我在任何地方都找不到这个问题或解决方案。似乎很多人都遇到了一个问题,即您有一个 fastAPI 应用程序,该应用程序还需要与其他一些微服务进行通信(比 http 消息更有效)。我已经阅读了所有关于集成到 asyncio 的 zmq 文档,但到目前为止,我还没有找到任何关于如何将 zmq 添加到带有 fastapi(甚至是 starlette)的事件循环中的信息。以下是来自 zmq 网站的代码示例:
import asyncio
import zmq
from zmq.asyncio import Context
ctx = Context.instance()
async def recv():
s = ctx.socket(zmq.SUB)
s.connect('tcp://127.0.0.1:5555')
s.subscribe(b'')
while True:
msg = await s.recv_multipart()
print('received', msg)
s.close()
Run Code Online (Sandbox Code Playgroud)
这向我们展示了一个异步函数,它很棒,但再次需要在事件循环中与 fastAPI 协程一起运行。这应该怎么做?除了后台任务之外,fastAPI 文档并没有真正为我们提供任何接口来运行单独的协程。我不确定后台任务中是否有其他魔法发生,但对于需要与另一个微服务通信的东西,我希望它具有类似于 fastAPI 协程的调度。此外,您无法在启动时启动后台任务,因此您必须进行一些虚假调用才能使其运行(它很笨拙……但技术上可行)。此外,如果我们可以只注册一个处理程序,那就更好了
@app.set("zmq_recv)
async def recv():
s = ctx.socket(zmq.SUB)
s.connect('tcp://127.0.0.1:5555')
s.subscribe(b'')....
Run Code Online (Sandbox Code Playgroud)
这将基于某处的配置,允许 zmq 上下文中的所有消息自动转到此函数。这可能允许我们在 fastAPI 协程中运行 zmq,只需绑定另一个端口,并确保来自该端口的所有流量都转到这个特殊的 app.set 方法。我对这样的事情会很好......
ctx = Context.instance()
@app.on_event("startup")
async def startup_event():
s = ctx.socket(zmq.PULL)
s.bind('tcp://127.0.0.1:5555').setHandler("zmq_recv") # this setHandler …Run Code Online (Sandbox Code Playgroud)