Phi*_*rch 8 python events python-multithreading python-asyncio
我正在设计一个Python应用程序,它应该访问一台机器来执行一些(冗长的)任务.对于与网络相关的所有内容,asyncio模块似乎是一个不错的选择,但现在我需要访问一个特定组件的串行端口.我已经为实际的串口实现了一种抽象层,但是无法弄清楚如何将它与asyncio明智地集成在一起.
下面的设置:我有一个运行循环的线程,它经常与机器对话并解码响应.使用一种方法enqueue_query(),我可以将一个查询字符串放入一个队列,然后由另一个线程将其发送到机器并引发响应.通过传入threading.Event(或带有set()方法的任何东西),调用者可以执行阻塞等待响应.这可能看起来像这样:
f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
f.wait()
print(ch.get_query_responses())
Run Code Online (Sandbox Code Playgroud)
我的目标是将这些行放入协程并让asyncio处理这个等待,以便应用程序可以在此期间执行其他操作.我怎么能这样做?它可能会通过将其包装f.wait()到Executor中来实现,但这似乎有点愚蠢,因为这会创建一个新线程,只是等待另一个线程做某事.
谢谢!最好的问候,菲利普
通过传入
threading.Event(或带有set()方法的任何东西),调用者可以执行阻塞等待响应.
鉴于查询函数的上述行为,您只需要一个线程安全的版本asyncio.Event.这只是3行代码:
import asyncio
class Event_ts(asyncio.Event):
#TODO: clear() method
def set(self):
#FIXME: The _loop attribute is not documented as public api!
self._loop.call_soon_threadsafe(super().set)
Run Code Online (Sandbox Code Playgroud)
功能测试:
def threaded(event):
import time
while True:
event.set()
time.sleep(1)
Run Code Online (Sandbox Code Playgroud)
async def main():
import threading
e = Event_ts()
threading.Thread(target=threaded, args=(e,)).start()
while True:
await e.wait()
e.clear()
print('whatever')
Run Code Online (Sandbox Code Playgroud)
asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()
Run Code Online (Sandbox Code Playgroud)
最简单的方法是完全按照您的建议进行操作 - 将调用包装f.wait()在执行程序中:
@asyncio.coroutine
def do_enqueue():
f = threading.Event()
ch.enqueue_query('2 getnlimit', f)
yield from loop.run_in_executor(None, f.wait)
print(ch.get_query_responses())
Run Code Online (Sandbox Code Playgroud)
您确实会产生启动线程池的开销(至少对于第一次调用,池将从那时起保留在内存中),但是任何提供threading.Event()线程安全阻塞和非阻塞等实现的解决方案APIs,在内部不依赖任何后台线程,会做更多的工作。
小智 6
Huazuo Gau 的答案中的类Event_ts在 Python 3.9 之前运行良好,但在 3.10 中不行。这是因为在Python 3.10中私有属性_loop最初是None.
以下代码适用于 Python 3.10 以及 3.9 及更低版本。(我clear()也添加了该方法。)
import asyncio
class Event_ts(asyncio.Event):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._loop is None:
self._loop = asyncio.get_event_loop()
def set(self):
self._loop.call_soon_threadsafe(super().set)
def clear(self):
self._loop.call_soon_threadsafe(super().clear)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3317 次 |
| 最近记录: |