我有一个订阅多个发布者的 zmq 服务器(但在这个例子中只有一个)
通过创建协程和接收序列化多部分消息的反序列化以使得服务器异步后pickle.loads和json.loads两者似乎都不起作用。(我什至没有收到错误消息)
当我在没有异步协程序列化的情况下制作我的应用程序时,它就像一个魅力。如果我省略反序列化,服务器也可以完美运行。
所以我想知道是否出于某种原因需要为 asyncio 采用 json 和 pickle api,如果是这样:这通常是如何完成的?
async def receiver():
print("1")
socket = ctx.socket(zmq.SUB)
with ctx.socket(zmq.SUB) as socket:
print("2")
socket.setsockopt(zmq.SUBSCRIBE, b"")
print("3")
for url in urls:
socket.connect(url)
print("4")
poller = Poller()
poller.register(socket, zmq.POLLIN)
while True:
events = await poller.poll()
if socket in dict(events):
print("5")
items = await socket.recv_multipart()
res = b"".join(items)
s = res.decode(encoding="utf-8")
print(s)
t = json.loads(s)
print(t)
print("6")
Run Code Online (Sandbox Code Playgroud)
如果我省略这些行t = json.loads(s),print(t)我将获得输出 6,当收到更多消息时,我将获得输出5以及 …