我正在尝试制作一个装饰器来包装协程或函数。
我尝试的第一件事是包装器中的简单重复代码:
def duration(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_ts = time.time()
result = func(*args, **kwargs)
dur = time.time() - start_ts
print('{} took {:.2} seconds'.format(func.__name__, dur))
return result
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
start_ts = time.time()
result = await func(*args, **kwargs)
dur = time.time() - start_ts
print('{} took {:.2} seconds'.format(func.__name__, dur))
return result
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return wrapper
Run Code Online (Sandbox Code Playgroud)
这有效,但我想避免代码重复,因为这并不比编写两个单独的装饰器好多少。
然后我尝试使用类制作一个装饰器:
class SyncAsyncDuration:
def __init__(self):
self.start_ts = None
def __call__(self, func):
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
self.setup(func, args, kwargs)
result = …Run Code Online (Sandbox Code Playgroud) 我正在尝试创建一个可以传递给 asyncio.gather() 的协程列表
但是,当我将它们附加到列表中时,我想将参数附加到这些协程。
下面显示的我当前的方法使用 functools.partial。不幸的是 asyncio.gather 不接受部分函数,这是有道理的。
对我来说没有意义的是如何找到解决方案。
示例代码:
async def test(arg1):
print(arg1)
statements = []
function = functools.partial(test, "hello world")
statements.append(function)
results = await asyncio.gather(*statements)
Run Code Online (Sandbox Code Playgroud)
那么如何将参数附加到函数,以便它仍然可以传递给 asyncio.gather?
*编辑
看来我是比较傻了。
我的解决方案相当简单,不要使用 functools.partial,只需将协程直接附加到列表中即可。
代码:
async def test(arg1):
print(arg1)
async def main():
statements = []
statements.append(test("hello_world"))
results = await asyncio.gather(*statements)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud) 我asyncio以一种非常基本的方式用于应用程序。检查互联网上的大多数教程(甚至官方文档),我看到他们使用get_event_loop()和loop.run_until_complete():
import asyncio
async def say(what, when):
await asyncio.sleep(when)
print(what)
loop = asyncio.get_event_loop()
loop.run_until_complete(say('hello world', 1))
loop.close()
Run Code Online (Sandbox Code Playgroud)
但是在Python 3.7 docs 中,我们可以阅读:
应用程序开发人员通常应该使用高级 asyncio 函数,例如asyncio.run(),并且很少需要引用循环对象或调用其方法。本节主要面向需要更好地控制事件循环行为的低级代码、库和框架的作者。
我发现它更简洁、更易于使用,但它仅适用于 Python 3.7+。所以在这里我必须做出选择,是使用 Python 3.7+run()还是使其与 Python 3.6 兼容并使用事件循环。你会如何处理这个?有没有一种简单的方法可以使它与 Python 3.6 向后兼容?在 Python 3.7 成为通用版本之前,我是否应该首先检查 Python 版本并基于此使用一种或另一种方式?
我正在编写一个我希望最终用户可以选择使用的库,就像它的方法和函数不是协同程序一样.
例如,给定此功能:
@asyncio.coroutine
def blah_getter():
return (yield from http_client.get('http://blahblahblah'))
Run Code Online (Sandbox Code Playgroud)
不关心在自己的代码中使用任何异步功能的最终用户仍然需要导入asyncio并运行:
>>> response = asyncio.get_event_loop().run_until_complete(blah_getter())
Run Code Online (Sandbox Code Playgroud)
如果可以的话,这将是很酷的,blah_getter确定我是否被称为协程,并做出相应的反应.
所以类似于:
@asyncio.coroutine
def blah_getter():
if magically_determine_if_being_yielded_from():
return (yield from http_client.get('http://blahblahblah'))
else:
el = asyncio.get_event_loop()
return el.run_until_complete(http_client.get('http://blahblahblah'))
Run Code Online (Sandbox Code Playgroud) 我正在编写一个连接到X个UNIX套接字的工具,发送命令并将输出保存在本地文件系统中.它每X秒运行一次.为了在工具接收终止信号时执行一些清理,我将一个函数(关闭)注册到signal.SIGHUP和signal.SIGTERM信号.此函数取消所有任务,然后关闭事件循环.
我的问题是我得到了
RuntimeError:事件循环在Future完成之前停止
当我发送signal.SIGTERM(杀'pid').我已经阅读了两次取消任务的文档,但我没有发现我在这里做错了什么.
我也注意到一些奇怪的事情,当我发送终止信号时,程序处于睡眠模式,我在日志中看到它唤醒了pull_stats()协程,你可以在日志的前两行看到这一点.
日志:
21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs**
21:53:45,857 [23031] [MainThread:pull_stats ] INFO pull statistics
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX …Run Code Online (Sandbox Code Playgroud) 我想我想通过制作一个允许你在一个下载多个资源的简单脚本来学习新的python异步等待语法,更具体地说是asyncio模块.
但是现在我被卡住了.
在研究时,我遇到了两个限制并发请求数量的选项:
是否有首选选项,或者如果您只想限制并发连接数,它们是否可以互换使用?性能方面(大致)是否相等?
两者似乎都有默认值100并发连接/操作.如果我只使用信号量限制为500,那么aiohttp内部会隐式地将我锁定为100个并发连接吗?
这对我来说都是非常新的和不清楚的.请随时指出我的任何误解或我的代码中的缺陷.
这是我的代码目前包含两个选项(我应该删除哪些?):
奖金问题:
小号
import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth
# You can ignore this class
class DummyDataHandler(DataHandler):
"""Takes data and stores it somewhere"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def take(self, origin_url, data):
return True
def done(self):
return None
class AsyncDownloader(object):
def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):
self.concurrent_connections = concurrent_connections
self.silent = silent
self.data_handler = data_handler or DummyDataHandler()
self.sending_bar = …Run Code Online (Sandbox Code Playgroud) 假设我有这样的代码
async def fetch_text() -> str:
return "text "
async def show_something():
something = await fetch_text()
print(something)
Run Code Online (Sandbox Code Playgroud)
这很好。但后来我想清理数据,所以我做
async def fetch_text() -> str:
return "text "
def fetch_clean_text(text: str) -> str:
text = await fetch_text()
return text.strip(text)
async def show_something():
something = fetch_clean_text()
print(something)
Run Code Online (Sandbox Code Playgroud)
(我可以清理里面的文本show_something(),但让我们假设show_something()可以打印很多东西并且不知道或不应该知道清理它们的正确方法。)
这当然是一个SyntaxError: 'await' outside async function. 但是——如果这段代码可以运行——当await表达式没有放在协程函数中时,它会在协程函数的上下文中执行。为什么不允许这种行为?
我在这个设计中看到了一位专业人士;在我的后一个例子中,你看不到show_something()的身体正在做一些可能导致其暂停的事情。但是如果我要创建fetch_clean_text()一个协程,它不仅会使事情复杂化,而且可能还会降低性能。拥有另一个本身不执行任何 I/O 的协程是没有意义的。有没有更好的办法?
如果 的一个任务gather引发异常,其他任务仍然可以继续。
嗯,这不完全是我需要的。我想区分致命的错误和需要取消所有剩余任务的错误,以及不是而是应该记录的错误,同时允许其他任务继续。
这是我实现这一点的失败尝试:
from asyncio import gather, get_event_loop, sleep
class ErrorThatShouldCancelOtherTasks(Exception):
pass
async def my_sleep(secs):
await sleep(secs)
if secs == 5:
raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
print(f'Slept for {secs}secs.')
async def main():
try:
sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
await sleepers
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
sleepers.cancel()
finally:
await sleep(5)
get_event_loop().run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
(finally await sleep这里是为了防止解释器立即关闭,这会自行取消所有任务)
奇怪的是,呼吁cancel在gather实际上并没有取消它!
PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
Run Code Online (Sandbox Code Playgroud)
我对这种行为感到非常惊讶,因为它似乎 …
中的许多函数asyncio都有不推荐使用的loop参数,计划在 Python 3.10 中删除。实例包括as_completed(),sleep(),和wait()。
我正在寻找有关这些参数及其删除的一些历史背景。
loop解决了哪些问题?一开始为什么要使用它?loop?为什么要大量删除?loop,现在它消失了?作为使用并发的新手,我对何时使用不同的 Python 并发库感到困惑。据我了解,多处理、多线程和异步编程是并发的一部分,而多处理是称为并行的并发子集的一部分。
我在网上搜索了有关在 python 中处理并发的不同方法,我遇到了多处理库、concurrenct.futures 的 ProcessPoolExecutor() 和 ThreadPoolExecutor() 以及 asyncio。让我困惑的是这些库之间的区别。特别是 multiprocessing 库的作用,因为它有像 pool.apply_async 这样的方法,它是否也做 asyncio 的工作?如果是这样,当它是从 asyncio 实现并发性的不同方法(多进程与协作多任务)时,为什么将其称为多处理?
python-asyncio ×10
python ×8
python-3.x ×4
aiohttp ×1
async-await ×1
cancellation ×1
decorator ×1
deprecated ×1
exception ×1
python-3.5 ×1