json.loads 和 pickle.loads 不能与 async 关键字一起使用?

Ren*_*rdt 5 json python-3.x pyzmq python-asyncio

我有一个订阅多个发布者的 zmq 服务器(但在这个例子中只有一个)

通过创建协程和接收序列化多部分消息的反序列化以使得服务器异步后pickle.loadsjson.loads两者似乎都不起作用。(我什至没有收到错误消息)

当我在没有异步协程序列化的情况下制作我的应用程序时,它就像一个魅力。如果我省略反序列化,服务器也可以完美运行。

所以我想知道是否出于某种原因需要为 asyncio 采用 json 和 pickle api,如果是这样:这通常是如何完成的?

async def receiver():
    print("1")
    socket =  ctx.socket(zmq.SUB)
    with ctx.socket(zmq.SUB) as socket:
        print("2")
        socket.setsockopt(zmq.SUBSCRIBE, b"")
        print("3")
        for url in urls:
            socket.connect(url)
        print("4")
        poller = Poller()
        poller.register(socket, zmq.POLLIN)
        while True:
            events = await poller.poll()
            if socket in dict(events):
                print("5")
                items = await socket.recv_multipart()
                res = b"".join(items)
                s = res.decode(encoding="utf-8")
                print(s)
                t = json.loads(s)
                print(t)
                print("6")
Run Code Online (Sandbox Code Playgroud)

如果我省略这些行t = json.loads(s)print(t)我将获得输出 6,当收到更多消息时,我将获得输出5以及6每条收到的消息。如果我保留这些行,我就不会得到输出6,也不会再收到任何消息

发布者代码基本上是这样的:

ctx = zmq.Context()
socket = ctx.socket(zmq.PUB)
socket.bind(addr)

f = open("foo.json", "r")
for line in f:
    jsn = json.loads(line)
    for el in jsn["somekey"]["someOtherKey"]:
        data = {"data":el,"some more metadata":"here is the meta data"}
        b = json.dumps(data).encode()
        #b = pickle.dumps(data,protocol=pickle.HIGHEST_PROTOCOL)
        socket.send_multipart(b)
Run Code Online (Sandbox Code Playgroud)

我应该声明发布者在 python2 上运行,而接收者在 python3 上运行。发布者代码不会在 python3 上运行,因为我收到了以下带有 python3 的堆栈跟踪(这对我来说也没有意义)

 Traceback (most recent call last):
 File ".../python3.6/site-packages/zmq/sugar/socket.py", line 356, in send_multipart
    _buffer_type(msg)
 TypeError: memoryview: a bytes-like object is required, not 'int'

 During handling of the above exception, another exception occurred:

 Traceback (most recent call last):
 File "Publisher2.py", line 39, in <module>
    main("tcp://127.0.0.1:5556", "mockPublisher")
 File "Publisher2.py", line 28, in main
    socket.send_multipart(b)
 File ".../python3.6/site-packages/zmq/sugar/socket.py", line 363, in send_multipart
    i, rmsg,
 TypeError: Frame 0 (123) does not support the buffer interface.
Run Code Online (Sandbox Code Playgroud)