如何从同步例程通知异步例程?

Fol*_*den 6 python asynchronous

我有一个异步websockets侦听器。该侦听器从同步主循环传递消息。我想让异步websockets侦听器知道有一条新消息要发送。

目前,我使用轮询循环(不良)来实现该目标。我尝试使用cond.notify_all(),但是不能在异步代码之外使用?

代码片段:

ws_data = {}
ws_data_lock = threading.Lock()

async def ws_serve(websocket, path):
    global ws_data
    global ws_data_lock

    listen_pair = await websocket.recv()

    p_fen = None

    while True:
        send = None

        with ws_data_lock:
            if p_fen == None or ws_data[listen_pair] != p_fen:
                send = p_fen = ws_data[listen_pair]

        if send:
            await websocket.send(send)

        await asyncio.sleep(0.25)
Run Code Online (Sandbox Code Playgroud)

...

def run_websockets_server():
    start_server = websockets.serve(ws_serve, ws_interface, ws_port)

    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

def client_listener():
    while True:
        with ws_data_lock:
            ws_data[pair_id] = (p1_user, p2_user, time.time())
            # here I would like to let all websocket listeners know that
            # there's new data

t = threading.Thread(target=client_listener)
t.start()

run_websockets_server()
Run Code Online (Sandbox Code Playgroud)

VPf*_*PfB 1

我只能以未经测试的代码片段的形式回答这个问题。我发现整个程序太复杂,无法自己修改和测试。

有两个问题需要解决。

1. 将数据从一个线程传递到另一个线程中的 asyncio 协程

只有一种选择,那就是call_soon_threadsafe()

我们需要存储循环引用:

def run_websockets_server():
    global ws_loop

    ....
    ws_loop = asyncio.get_event_loop()
    ws_loop.run_until_complete(start_server)
    ws_loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

然后我们可以告诉循环安排一个函数立即执行:

def client_listener():
    ...
            # here I would like to let all websocket listeners know that
            # there's new data
            ws_loop.call_soon_threadsafe(notify_all)
Run Code Online (Sandbox Code Playgroud)

但是我们可以发送新数据,这样消费者就不必获取它。

            # here I would like to let all websocket listeners know that
            # there's new data
            new_data = ...
            ws_loop.call_soon_threadsafe(notify_all, new_data)
Run Code Online (Sandbox Code Playgroud)

请自行决定是否仍需要锁定。

2.将数据传递给所有ws_serve实例

让我们使用asyncio.Queue。显然我们每个实例都需要一个。首先必须定义一个存储:

# new global variable
ws_queues = set()
Run Code Online (Sandbox Code Playgroud)

每个实例将维护自己的队列:

async def ws_serve(websocket, path):
    queue = asyncio.Queue()
    ws_queues.add(queue)
    try:
        ...
        while True:
            new_data = await queue.get()
            # send
    finally:
        ws_queues.remove(queue)
Run Code Online (Sandbox Code Playgroud)

以及最后缺失的部分:

def notify_all(data):
    for q in ws_queues:
        q.put_nowait(data)
Run Code Online (Sandbox Code Playgroud)