Python中传统线程和异步线程如何通信?

Inc*_*ete 6 python queue message-queue python-asyncio

在python中,在两个s之间建立单向通信的惯用方法是什么threading.Thread,将它们称为 threada和 thread b

a是生产者,它不断地产生价值以供b消费。

b是消费者,它读取 生成的一个值a,用协程处理该值,然后读取下一个值,依此类推。

插图:

q = very_magic_queue.Queue()


def worker_of_a(q):
    while True:
        q.put(1)
        time.sleep(1)

a = threading.Thread(worker_of_a, args=(q,))
a.start()


async def loop(q):
    while True:
        # v must be processed in the same order as they are produced
        v = await q.get()
        print(v)

async def foo():
    pass

async def b_main(q):
    loop_fut = asyncio.ensure_future(loop(q))
    foo_fut = asyncio.ensure_future(foo())
    _ = await asyncio.wait([loop_fut, foo_fut], ...)
    # blah blah blah

def worker_of_b(q):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(b_main(q))

b = threading.Thread(worker_of_b, args=(q,))
b.start()
Run Code Online (Sandbox Code Playgroud)

当然,上面的代码不起作用,因为queue.Queue.get不能被awaitted,也asyncio.Queue不能在另一个线程中使用。

b我还需要一个从到 的沟通渠道a

如果该解决方案也可以与gevent.

谢谢 :)

jde*_*esa 4

您可以使用模块中的同步队列queue并将等待推迟到ThreadPoolExecutor

async def loop(q):
    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=1) as executor:
        loop = asyncio.get_event_loop()
        while True:
            # v must be processed in the same order as they are produced
            v = await loop.run_in_executor(executor, q.get)
            print(v)
Run Code Online (Sandbox Code Playgroud)