Gri*_*mar 16 python asynchronous coroutine async-await python-asyncio
当实现在同步和异步应用程序中都有用的类时,我发现自己在两个用例中维护着几乎相同的代码。
仅作为示例,请考虑:
from time import sleep
import asyncio
class UselessExample:
def __init__(self, delay):
self.delay = delay
async def a_ticker(self, to):
for i in range(to):
yield i
await asyncio.sleep(self.delay)
def ticker(self, to):
for i in range(to):
yield i
sleep(self.delay)
def func(ue):
for value in ue.ticker(5):
print(value)
async def a_func(ue):
async for value in ue.a_ticker(5):
print(value)
def main():
ue = UselessExample(1)
func(ue)
loop = asyncio.get_event_loop()
loop.run_until_complete(a_func(ue))
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
在此示例中,还算不错,串联的ticker
方法UselessExample
易于维护,但是您可以想象异常处理和更复杂的功能可以迅速增加方法并使之成为问题,即使这两种方法实际上都可以保留下来相同(仅将某些元素替换为它们的异步对应元素)。
假设没有什么实质性差异值得完全实现,那么维护这样的类并避免不必要的重复的最佳方法(也是最Pythonic的)是什么?
Mar*_*ers 12
要使基于异步协程的异步代码库可用于传统同步代码库,没有一条千篇一律的方法。您必须根据代码路径进行选择。
从一系列工具中选择:
async.run()
在协程周围提供同步包装,这些包装会阻塞直到协程完成。
甚至异步生成器函数(例如)也ticker()
可以通过这种方式在循环中处理:
class UselessExample:
def __init__(self, delay):
self.delay = delay
async def a_ticker(self, to):
for i in range(to):
yield i
await asyncio.sleep(self.delay)
def ticker(self, to):
agen = self.a_ticker(to)
try:
while True:
yield asyncio.run(agen.__anext__())
except StopAsyncIteration:
return
Run Code Online (Sandbox Code Playgroud)
这些同步包装器可以使用辅助函数生成:
from functools import wraps
def sync_agen_method(agen_method):
@wraps(agen_method)
def wrapper(self, *args, **kwargs):
agen = agen_method(self, *args, **kwargs)
try:
while True:
yield asyncio.run(agen.__anext__())
except StopAsyncIteration:
return
if wrapper.__name__[:2] == 'a_':
wrapper.__name__ = wrapper.__name__[2:]
return wrapper
Run Code Online (Sandbox Code Playgroud)
然后只ticker = sync_agen_method(a_ticker)
在类定义中使用。
简化协程方法(而不是生成协程)可以用以下方法包装:
def sync_method(async_method):
@wraps(async_method)
def wrapper(self, *args, **kwargs):
return async.run(async_method(self, *args, **kwargs))
if wrapper.__name__[:2] == 'a_':
wrapper.__name__ = wrapper.__name__[2:]
return wrapper
Run Code Online (Sandbox Code Playgroud)
将同步部分重构为生成器,上下文管理器,实用程序功能等。
对于您的特定示例,将for
循环拉出到单独的生成器中会将重复的代码最小化为两个版本休眠的方式:
class UselessExample:
def __init__(self, delay):
self.delay = delay
def _ticker_gen(self, to):
yield from range(to)
async def a_ticker(self, to):
for i in self._ticker_gen(to):
yield i
await asyncio.sleep(self.delay)
def ticker(self, to):
for i in self._ticker_gen(to):
yield i
sleep(self.delay)
Run Code Online (Sandbox Code Playgroud)
尽管这并没有太大的区别,但可以在其他情况下使用。
使用AST重写和映射将协程转换为同步代码。这可能是,如果你不小心,你如何认识的实用功能,如相当脆弱asyncio.sleep()
VS time.sleep()
:
import inspect
import ast
import copy
import textwrap
import time
asynciomap = {
# asyncio function to (additional globals, replacement source) tuples
"sleep": ({"time": time}, "time.sleep")
}
class AsyncToSync(ast.NodeTransformer):
def __init__(self):
self.globals = {}
def visit_AsyncFunctionDef(self, node):
return ast.copy_location(
ast.FunctionDef(
node.name,
self.visit(node.args),
[self.visit(stmt) for stmt in node.body],
[self.visit(stmt) for stmt in node.decorator_list],
node.returns and ast.visit(node.returns),
),
node,
)
def visit_Await(self, node):
return self.visit(node.value)
def visit_Attribute(self, node):
if (
isinstance(node.value, ast.Name)
and isinstance(node.value.ctx, ast.Load)
and node.value.id == "asyncio"
and node.attr in asynciomap
):
g, replacement = asynciomap[node.attr]
self.globals.update(g)
return ast.copy_location(
ast.parse(replacement, mode="eval").body,
node
)
return node
def transform_sync(f):
filename = inspect.getfile(f)
lines, lineno = inspect.getsourcelines(f)
ast_tree = ast.parse(textwrap.dedent(''.join(lines)), filename)
ast.increment_lineno(ast_tree, lineno - 1)
transformer = AsyncToSync()
transformer.visit(ast_tree)
tranformed_globals = {**f.__globals__, **transformer.globals}
exec(compile(ast_tree, filename, 'exec'), tranformed_globals)
return tranformed_globals[f.__name__]
Run Code Online (Sandbox Code Playgroud)
尽管以上内容可能还远远不够满足所有需求,并且转换AST树可能令人生畏,但以上内容将使您仅维护异步版本并将该版本直接映射到同步版本:
>>> import example
>>> del example.UselessExample.ticker
>>> example.main()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../example.py", line 32, in main
func(ue)
File "/.../example.py", line 21, in func
for value in ue.ticker(5):
AttributeError: 'UselessExample' object has no attribute 'ticker'
>>> example.UselessExample.ticker = transform_sync(example.UselessExample.a_ticker)
>>> example.main()
0
1
2
3
4
0
1
2
3
4
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
585 次 |
最近记录: |