Inc*_*ete 6 python queue message-queue python-asyncio
在python中,在两个s之间建立单向通信的惯用方法是什么threading.Thread,将它们称为 threada和 thread b。
a是生产者,它不断地产生价值以供b消费。
b是消费者,它读取 生成的一个值a,用协程处理该值,然后读取下一个值,依此类推。
插图:
q = very_magic_queue.Queue()
def worker_of_a(q):
while True:
q.put(1)
time.sleep(1)
a = threading.Thread(worker_of_a, args=(q,))
a.start()
async def loop(q):
while True:
# v must be processed in the same order as they are produced
v = await q.get()
print(v)
async def foo():
pass
async def b_main(q):
loop_fut = asyncio.ensure_future(loop(q))
foo_fut = asyncio.ensure_future(foo())
_ = await asyncio.wait([loop_fut, foo_fut], ...)
# blah blah blah
def worker_of_b(q):
asyncio.set_event_loop(asyncio.new_event_loop())
asyncio.get_event_loop().run_until_complete(b_main(q))
b = threading.Thread(worker_of_b, args=(q,))
b.start()
Run Code Online (Sandbox Code Playgroud)
当然,上面的代码不起作用,因为queue.Queue.get不能被awaitted,也asyncio.Queue不能在另一个线程中使用。
b我还需要一个从到 的沟通渠道a。
如果该解决方案也可以与gevent.
谢谢 :)
您可以使用模块中的同步队列queue并将等待推迟到ThreadPoolExecutor:
async def loop(q):
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=1) as executor:
loop = asyncio.get_event_loop()
while True:
# v must be processed in the same order as they are produced
v = await loop.run_in_executor(executor, q.get)
print(v)
Run Code Online (Sandbox Code Playgroud)