标签: python-asyncio

如何创建一个可以包装协程或函数的 Python 装饰器?

我正在尝试制作一个装饰器来包装协程或函数。

我尝试的第一件事是包装器中的简单重复代码:

def duration(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_ts = time.time()
        result = func(*args, **kwargs)
        dur = time.time() - start_ts
        print('{} took {:.2} seconds'.format(func.__name__, dur))
        return result

    @functools.wraps(func)
    async def async_wrapper(*args, **kwargs):
        start_ts = time.time()
        result = await func(*args, **kwargs)
        dur = time.time() - start_ts
        print('{} took {:.2} seconds'.format(func.__name__, dur))
        return result

    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    else:
        return wrapper
Run Code Online (Sandbox Code Playgroud)

这有效,但我想避免代码重复,因为这并不比编写两个单独的装饰器好多少。

然后我尝试使用类制作一个装饰器:

class SyncAsyncDuration:
    def __init__(self):
        self.start_ts = None

    def __call__(self, func):
        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            self.setup(func, args, kwargs)
            result = …
Run Code Online (Sandbox Code Playgroud)

decorator python-asyncio

17
推荐指数
3
解决办法
6162
查看次数

Python3 如何 asyncio.gather() 部分函数的列表

我正在尝试创建一个可以传递给 asyncio.gather() 的协程列表

但是,当我将它们附加到列表中时,我想将参数附加到这些协程。

下面显示的我当前的方法使用 functools.partial。不幸的是 asyncio.gather 不接受部分函数,​​这是有道理的。

对我来说没有意义的是如何找到解决方案。

示例代码:

async def test(arg1):
    print(arg1)

statements = []

function = functools.partial(test, "hello world")
statements.append(function)

results = await asyncio.gather(*statements)
Run Code Online (Sandbox Code Playgroud)

那么如何将参数附加到函数,以便它仍然可以传递给 asyncio.gather?

*编辑

看来我是比较傻了。

我的解决方案相当简单,不要使用 functools.partial,只需将协程直接附加到列表中即可。

代码:

async def test(arg1):
    print(arg1)

async def main():
    statements = []
    statements.append(test("hello_world"))
    results = await asyncio.gather(*statements)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

python-3.x python-asyncio

17
推荐指数
2
解决办法
2万
查看次数

asyncio run 或 run_until_complete

asyncio以一种非常基本的方式用于应用程序。检查互联网上的大多数教程(甚至官方文档),我看到他们使用get_event_loop()loop.run_until_complete()

import asyncio

async def say(what, when):
    await asyncio.sleep(when)
    print(what)

loop = asyncio.get_event_loop()
loop.run_until_complete(say('hello world', 1))
loop.close()
Run Code Online (Sandbox Code Playgroud)

但是在Python 3.7 docs 中,我们可以阅读:

应用程序开发人员通常应该使用高级 asyncio 函数,例如asyncio.run(),并且很少需要引用循环对象或调用其方法。本节主要面向需要更好地控制事件循环行为的低级代码、库和框架的作者。

我发现它更简洁、更易于使用,但它仅适用于 Python 3.7+。所以在这里我必须做出选择,是使用 Python 3.7+run()还是使其与 Python 3.6 兼容并使用事件循环。你会如何处理这个?有没有一种简单的方法可以使它与 Python 3.6 向后兼容?在 Python 3.7 成为通用版本之前,我是否应该首先检查 Python 版本并基于此使用一种或另一种方式?

python python-asyncio

17
推荐指数
1
解决办法
1万
查看次数

如何编写可选择作为常规函数的asyncio协同程序?

我正在编写一个我希望最终用户可以选择使用的库,就像它的方法和函数不是协同程序一样.

例如,给定此功能:

@asyncio.coroutine
def blah_getter():
    return (yield from http_client.get('http://blahblahblah'))
Run Code Online (Sandbox Code Playgroud)

不关心在自己的代码中使用任何异步功能的最终用户仍然需要导入asyncio并运行:

>>> response = asyncio.get_event_loop().run_until_complete(blah_getter())
Run Code Online (Sandbox Code Playgroud)

如果可以的话,这将是很酷的,blah_getter确定我是否被称为协程,并做出相应的反应.

所以类似于:

@asyncio.coroutine
def blah_getter():
    if magically_determine_if_being_yielded_from():
        return (yield from http_client.get('http://blahblahblah'))
    else:
        el = asyncio.get_event_loop()
        return el.run_until_complete(http_client.get('http://blahblahblah'))
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio

16
推荐指数
2
解决办法
6006
查看次数

正确关闭asyncio任务的方法

我正在编写一个连接到X个UNIX套接字的工具,发送命令并将输出保存在本地文件系统中.它每X秒运行一次.为了在工具接收终止信号时执行一些清理,我将一个函数(关闭)注册到signal.SIGHUP和signal.SIGTERM信号.此函数取消所有任务,然后关闭事件循环.

我的问题是我得到了

RuntimeError:事件循环在Future完成之前停止

当我发送signal.SIGTERM(杀'pid').我已经阅读了两次取消任务的文档,但我没有发现我在这里做错了什么.

我也注意到一些奇怪的事情,当我发送终止信号时,程序处于睡眠模式,我在日志中看到它唤醒了pull_stats()协程,你可以在日志的前两行看到这一点.

日志:

21:53:44,194 [23031] [MainThread:supervisor  ] DEBUG    **sleeping for 9.805s secs**
21:53:45,857 [23031] [MainThread:pull_stats  ] INFO     pull statistics
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX …
Run Code Online (Sandbox Code Playgroud)

python python-asyncio

16
推荐指数
1
解决办法
2万
查看次数

aiohttp.TCPConnector(带限制参数)vs asyncio.Semaphore用于限制并发连接数

我想我想通过制作一个允许你在一个下载多个资源的简单脚本来学习新的python异步等待语法,更具体地说是asyncio模块.

但是现在我被卡住了.

在研究时,我遇到了两个限制并发请求数量的选项:

  1. 将aiohttp.TCPConnector(带限制参数)传递给aiohttp.ClientSession或
  2. 使用asyncio.Semaphore.

是否有首选选项,或者如果您只想限制并发连接数,它们是否可以互换使用?性能方面(大致)是否相等?

两者似乎都有默认值100并发连接/操作.如果我只使用信号量限制为500,那么aiohttp内部会隐式地将我锁定为100个并发连接吗?

这对我来说都是非常新的和不清楚的.请随时指出我的任何误解或我的代码中的缺陷.

这是我的代码目前包含两个选项(我应该删除哪些?):

奖金问题:

  1. 如何处理(最好重试x次)出现错误的coros?
  2. coro完成后,保存返回数据(通知我的DataHandler)的最佳方法是什么?我不希望最后全部保存,因为我可以尽快开始处理结果.

小号

import asyncio
from tqdm import tqdm
import uvloop as uvloop
from aiohttp import ClientSession, TCPConnector, BasicAuth

# You can ignore this class
class DummyDataHandler(DataHandler):
    """Takes data and stores it somewhere"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def take(self, origin_url, data):
        return True

    def done(self):
        return None

class AsyncDownloader(object):
    def __init__(self, concurrent_connections=100, silent=False, data_handler=None, loop_policy=None):

        self.concurrent_connections = concurrent_connections
        self.silent = silent

        self.data_handler = data_handler or DummyDataHandler()

        self.sending_bar = …
Run Code Online (Sandbox Code Playgroud)

python async-await python-asyncio aiohttp python-3.5

16
推荐指数
1
解决办法
823
查看次数

为什么我只能在 async 函数中使用 await 关键字?

假设我有这样的代码

async def fetch_text() -> str:
    return "text "

async def show_something():
    something = await fetch_text()
    print(something)
Run Code Online (Sandbox Code Playgroud)

这很好。但后来我想清理数据,所以我做

async def fetch_text() -> str:
    return "text "

def fetch_clean_text(text: str) -> str:
    text = await fetch_text()
    return text.strip(text)

async def show_something():
    something = fetch_clean_text()
    print(something)
Run Code Online (Sandbox Code Playgroud)

(我可以清理里面的文本show_something(),但让我们假设show_something()可以打印很多东西并且不知道或不应该知道清理它们的正确方法。)

这当然是一个SyntaxError: 'await' outside async function. 但是——如果这段代码可以运行——当await表达式没有放在协程函数中时,它会在协程函数的上下文中执行。为什么不允许这种行为?

我在这个设计中看到了一位专业人士;在我的后一个例子中,你看不到show_something()的身体正在做一些可能导致其暂停的事情。但是如果我要创建fetch_clean_text()一个协程,它不仅会使事情复杂化,而且可能还会降低性能。拥有另一个本身不执行任何 I/O 的协程是没有意义的。有没有更好的办法?

python python-asyncio

16
推荐指数
2
解决办法
4万
查看次数

如果一个任务失败,如何取消收集中的所有剩余任务?

如果 的一个任务gather引发异常,其他任务仍然可以继续。

嗯,这不完全是我需要的。我想区分致命的错误和需要取消所有剩余任务的错误,以及不是而是应该记录的错误,同时允许其他任务继续。

这是我实现这一点的失败尝试:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for {secs}secs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

finally await sleep这里是为了防止解释器立即关闭,这会自行取消所有任务)

奇怪的是,呼吁cancelgather实际上并没有取消它!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
Run Code Online (Sandbox Code Playgroud)

我对这种行为感到非常惊讶,因为它似乎 …

python exception cancellation python-3.x python-asyncio

16
推荐指数
3
解决办法
3692
查看次数

asyncio 中所有这些已弃用的“循环”参数是什么?

中的许多函数asyncio都有不推荐使用的loop参数,计划在 Python 3.10 中删除。实例包括as_completed()sleep(),和wait()

我正在寻找有关这些参数及其删除的一些历史背景。

  • loop解决了哪些问题?一开始为什么要使用它?
  • 出了什么问题loop?为什么要大量删除?
  • 什么取代了loop,现在它消失了?

python deprecated python-3.x python-asyncio

16
推荐指数
1
解决办法
3232
查看次数

python中multiprocessing、asyncio和concurrency.futures的区别

作为使用并发的新手,我对何时使用不同的 Python 并发库感到困惑。据我了解,多处理、多线程和异步编程是并发的一部分,而多处理是称为并行的并发子集的一部分。

我在网上搜索了有关在 python 中处理并发的不同方法,我遇到了多处理库、concurrenct.futures 的 ProcessPoolExecutor() 和 ThreadPoolExecutor() 以及 asyncio。让我困惑的是这些库之间的区别。特别是 multiprocessing 库的作用,因为它有像 pool.apply_async 这样的方法,它是否也做 asyncio 的工作?如果是这样,当它是从 asyncio 实现并发性的不同方法(多进程与协作多任务)时,为什么将其称为多处理?

python multithreading multiprocessing python-asyncio

16
推荐指数
1
解决办法
2618
查看次数