有没有办法在多个线程中使用asyncio.Queue?

Ale*_*lev 13 python python-3.x python-asyncio

我们假设我有以下代码:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

这段代码的问题在于asynccoroutine 中的循环永远不会完成第一次迭代,而queue大小正在增加.

为什么会这样发生,我该怎么做才能解决它?

我无法摆脱单独的线程,因为在我的实际代码中,我使用单独的线程与串行设备进行通信,而我还没有找到使用它的方法asyncio.

dan*_*ano 23

asyncio.Queue 是不是线程安全的,所以你不能直接从多个线程使用它.相反,您可以使用janus,这是一个提供线程感知asyncio队列的第三方库:

import asyncio
import threading
import janus

def threaded(squeue):
    import time
    while True:
        time.sleep(2)
        squeue.put_nowait(time.time())
        print(squeue.qsize())

@asyncio.coroutine
def async(aqueue):
    while True:
        time = yield from aqueue.get()
        print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

还有aioprocessing(完全披露:我写了它),它提供了进程安全(以及作为副作用,线程安全)队列,但是如果你不想使用那就太过分了multiprocessing.


cro*_*nos 6

如果您不想使用其他库,则可以从线程中安排协程。queue.put_nowait用以下内容代替可以正常工作。

asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop)
Run Code Online (Sandbox Code Playgroud)

该变量loop表示主线程中的事件循环。

编辑:

async协程不执行任何操作的原因是事件循环永远不会给它机会。队列对象不是线程安全的,如果您深入研究cpython代码,您会发现这意味着put_nowait通过使用call_soon事件循环方法中的future 来唤醒队列的使用者。如果我们可以使用call_soon_threadsafe它,它应该可以工作。call_soon和之间的主要区别call_soon_threadsafe是,call_soon_threadsafe通过调用唤醒了事件循环loop._write_to_self()。因此,我们自己称呼它:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        queue._loop._write_to_self()
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

然后,一切正常。

至于访问共享对象的线程安全方面,请asyncio.queuecollections.deque具有线程安全append和的后台使用popleft。也许检查队列是否不为空并且popleft不是原子的,但是如果仅在一个线程(事件循环之一)中使用队列,就可以了。

loop.call_soon_threadsafe高华佐的回答和我提出的其他解决方案asyncio.run_coroutine_threadsafe都在这样做,这会唤醒事件循环。


Hua*_*Gao 5

BaseEventLoop.call_soon_threadsafe就在眼前。有关详细信息,请参见asyncio文档

只需threaded()像这样更改您的:

def threaded():
    import time
    while True:
        time.sleep(1)
        loop.call_soon_threadsafe(queue.put_nowait, time.time())
        loop.call_soon_threadsafe(lambda: print(queue.qsize()))
Run Code Online (Sandbox Code Playgroud)

这是一个示例输出:

0
1443857763.3355968
0
1443857764.3368602
0
1443857765.338082
0
1443857766.3392274
0
1443857767.3403943
Run Code Online (Sandbox Code Playgroud)