asyncio:等待来自其他线程的事件

Phi*_*rch 8 python events python-multithreading python-asyncio

我正在设计一个Python应用程序,它应该访问一台机器来执行一些(冗长的)任务.对于与网络相关的所有内容,asyncio模块似乎是一个不错的选择,但现在我需要访问一个特定组件的串行端口.我已经为实际的串口实现了一种抽象层,但是无法弄清楚如何将它与asyncio明智地集成在一起.

下面的设置:我有一个运行循环的线程,它经常与机器对话并解码响应.使用一种方法enqueue_query(),我可以将一个查询字符串放入一个队列,然后由另一个线程将其发送到机器并引发响应.通过传入threading.Event(或带有set()方法的任何东西),调用者可以执行阻塞等待响应.这可能看起来像这样:

f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())
Run Code Online (Sandbox Code Playgroud)

我的目标是将这些行放入协程并让asyncio处理这个等待,以便应用程序可以在此期间执行其他操作.我怎么能这样做?它可能会通过将其包装f.wait()到Executor中来实现,但这似乎有点愚蠢,因为这会创建一个新线程,只是等待另一个线程做某事.

谢谢!最好的问候,菲利普

Hua*_*Gao 9

通过传入threading.Event(或带有set()方法的任何东西),调用者可以执行阻塞等待响应.

鉴于查询函数的上述行为,您只需要一个线程安全的版本asyncio.Event.这只是3行代码:

import asyncio
class Event_ts(asyncio.Event):
    #TODO: clear() method
    def set(self):
        #FIXME: The _loop attribute is not documented as public api!
        self._loop.call_soon_threadsafe(super().set)
Run Code Online (Sandbox Code Playgroud)

功能测试:

def threaded(event):
    import time
    while True:
        event.set()
        time.sleep(1)
Run Code Online (Sandbox Code Playgroud)

async def main():
    import threading
    e = Event_ts()
    threading.Thread(target=threaded, args=(e,)).start()
    while True:
        await e.wait()
        e.clear()
        print('whatever')
Run Code Online (Sandbox Code Playgroud)

asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)

  • 这在大多数情况下应该可以正常工作,但值得注意的是,从事件循环线程调用 `e.set()` 实际上会延迟 `set()` 调用的执行,直到调用方法产生/等待或完全退出. 通常这很好,但有时可能会导致意外行为。 (3认同)

dan*_*ano 6

最简单的方法是完全按照您的建议进行操作 - 将调用包装f.wait()在执行程序中:

@asyncio.coroutine
def do_enqueue():
    f = threading.Event()
    ch.enqueue_query('2 getnlimit', f)
    yield from loop.run_in_executor(None, f.wait)
    print(ch.get_query_responses())
Run Code Online (Sandbox Code Playgroud)

您确实会产生启动线程池的开销(至少对于第一次调用,池将从那时起保留在内存中),但是任何提供threading.Event()线程安全阻塞和非阻塞等实现的解决方案APIs,在内部不依赖任何后台线程,会做更多的工作。

  • 但请注意,默认执行器的“max_workers”(即等待事件的最大协程)设置为 **40**。当我使用所提供的解决方案使“threading.barrier”可等待时,这让我很恼火。 (2认同)

小智 6

Huazuo Gau 的答案中的类Event_ts在 Python 3.9 之前运行良好,但在 3.10 中不行。这是因为在Python 3.10中私有属性_loop最初是None.

以下代码适用于 Python 3.10 以及 3.9 及更低版本。(我clear()也添加了该方法。)

import asyncio
class Event_ts(asyncio.Event):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if self._loop is None:
            self._loop = asyncio.get_event_loop()

    def set(self):
        self._loop.call_soon_threadsafe(super().set)

    def clear(self):
        self._loop.call_soon_threadsafe(super().clear)
Run Code Online (Sandbox Code Playgroud)