如何在多个线程之间共享 asyncio.Queue?

Che*_* A. 5 python multithreading python-asyncio

这个问题不同于Is there a way to use asyncio.Queue in multiple thread?

我有 2 个异步事件循环在两个不同的线程中运行。Thread1 通过asyncio.Queue()向 Thread2 生成数据。

其中一个线程抛出异常:got Future <Future pending> attached to a different loop

现在这是真的,因为我有一个在不同循环中使用的队列。如何在两个不同线程中的两个循环之间共享队列?

示例代码:

q = asyncio.Queue()

async def producer(q):
    await asyncio.sleep(3)
    q.put(1)

def prod_work(q):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(producer(q))

async def consumer(q):
    await asyncio.sleep(3)
    res = await q.get()

def cons_work(q):
    loop2 = asyncio.new_event_loop()
    asyncio.set_event_loop(loop2)
    loop2.run_until_complete(consumer(q))

def worker(q):
    # main thread - uses this threads loop  
    prod = threading.Thread(target=prod_work, args=(q,))

    # separate thread - uses NEW loop
    cons = threading.Thread(target=cons_work, args=(q,))

    prod.start()
    cons.start()

worker(q)
Run Code Online (Sandbox Code Playgroud)

完整的堆栈跟踪:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "asyncio_examples.py", line 24, in cons_work
    loop2.run_until_complete(consumer(q))
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "asyncio_examples.py", line 18, in consumer
    res = await q.get()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/queues.py", line 159, in get
    await getter
RuntimeError: Task <Task pending coro=<consumer() running at asyncio_examples.py:18> cb=[_run_until_complete_cb() at /usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py:150]> got Future <Future pending> attached to a different loop
Run Code Online (Sandbox Code Playgroud)