Fol*_*den 6 python asynchronous
我有一个异步websockets侦听器。该侦听器从同步主循环传递消息。我想让异步websockets侦听器知道有一条新消息要发送。
目前,我使用轮询循环(不良)来实现该目标。我尝试使用cond.notify_all(),但是不能在异步代码之外使用?
代码片段:
ws_data = {}
ws_data_lock = threading.Lock()
async def ws_serve(websocket, path):
    global ws_data
    global ws_data_lock
    listen_pair = await websocket.recv()
    p_fen = None
    while True:
        send = None
        with ws_data_lock:
            if p_fen == None or ws_data[listen_pair] != p_fen:
                send = p_fen = ws_data[listen_pair]
        if send:
            await websocket.send(send)
        await asyncio.sleep(0.25)
...
def run_websockets_server():
    start_server = websockets.serve(ws_serve, ws_interface, ws_port)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
def client_listener():
    while True:
        with ws_data_lock:
            ws_data[pair_id] = (p1_user, p2_user, time.time())
            # here I would like to let all websocket listeners know that
            # there's new data
t = threading.Thread(target=client_listener)
t.start()
run_websockets_server()
我只能以未经测试的代码片段的形式回答这个问题。我发现整个程序太复杂,无法自己修改和测试。
有两个问题需要解决。
只有一种选择,那就是call_soon_threadsafe()。
我们需要存储循环引用:
def run_websockets_server():
    global ws_loop
    ....
    ws_loop = asyncio.get_event_loop()
    ws_loop.run_until_complete(start_server)
    ws_loop.run_forever()
然后我们可以告诉循环安排一个函数立即执行:
def client_listener():
    ...
            # here I would like to let all websocket listeners know that
            # there's new data
            ws_loop.call_soon_threadsafe(notify_all)
但是我们可以发送新数据,这样消费者就不必获取它。
            # here I would like to let all websocket listeners know that
            # there's new data
            new_data = ...
            ws_loop.call_soon_threadsafe(notify_all, new_data)
请自行决定是否仍需要锁定。
ws_serve实例让我们使用asyncio.Queue。显然我们每个实例都需要一个。首先必须定义一个存储:
# new global variable
ws_queues = set()
每个实例将维护自己的队列:
async def ws_serve(websocket, path):
    queue = asyncio.Queue()
    ws_queues.add(queue)
    try:
        ...
        while True:
            new_data = await queue.get()
            # send
    finally:
        ws_queues.remove(queue)
以及最后缺失的部分:
def notify_all(data):
    for q in ws_queues:
        q.put_nowait(data)
| 归档时间: | 
 | 
| 查看次数: | 153 次 | 
| 最近记录: |