标签: python-asyncio

为什么我只能在 async 函数中使用 await 关键字?

假设我有这样的代码

async def fetch_text() -> str:
    return "text "

async def show_something():
    something = await fetch_text()
    print(something)
Run Code Online (Sandbox Code Playgroud)

这很好。但后来我想清理数据,所以我做

async def fetch_text() -> str:
    return "text "

def fetch_clean_text(text: str) -> str:
    text = await fetch_text()
    return text.strip(text)

async def show_something():
    something = fetch_clean_text()
    print(something)
Run Code Online (Sandbox Code Playgroud)

(我可以清理里面的文本show_something(),但让我们假设show_something()可以打印很多东西并且不知道或不应该知道清理它们的正确方法。)

这当然是一个SyntaxError: 'await' outside async function. 但是——如果这段代码可以运行——当await表达式没有放在协程函数中时,它会在协程函数的上下文中执行。为什么不允许这种行为?

我在这个设计中看到了一位专业人士;在我的后一个例子中,你看不到show_something()的身体正在做一些可能导致其暂停的事情。但是如果我要创建fetch_clean_text()一个协程,它不仅会使事情复杂化,而且可能还会降低性能。拥有另一个本身不执行任何 I/O 的协程是没有意义的。有没有更好的办法?

python python-asyncio

16
推荐指数
2
解决办法
4万
查看次数

如果一个任务失败,如何取消收集中的所有剩余任务?

如果 的一个任务gather引发异常,其他任务仍然可以继续。

嗯,这不完全是我需要的。我想区分致命的错误和需要取消所有剩余任务的错误,以及不是而是应该记录的错误,同时允许其他任务继续。

这是我实现这一点的失败尝试:

from asyncio import gather, get_event_loop, sleep

class ErrorThatShouldCancelOtherTasks(Exception):
    pass

async def my_sleep(secs):
    await sleep(secs)
    if secs == 5:
        raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
    print(f'Slept for {secs}secs.')

async def main():
    try:
        sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
        await sleepers
    except ErrorThatShouldCancelOtherTasks:
        print('Fatal error; cancelling')
        sleepers.cancel()
    finally:
        await sleep(5)

get_event_loop().run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

finally await sleep这里是为了防止解释器立即关闭,这会自行取消所有任务)

奇怪的是,呼吁cancelgather实际上并没有取消它!

PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
Run Code Online (Sandbox Code Playgroud)

我对这种行为感到非常惊讶,因为它似乎 …

python exception cancellation python-3.x python-asyncio

16
推荐指数
3
解决办法
3692
查看次数

python中multiprocessing、asyncio和concurrency.futures的区别

作为使用并发的新手,我对何时使用不同的 Python 并发库感到困惑。据我了解,多处理、多线程和异步编程是并发的一部分,而多处理是称为并行的并发子集的一部分。

我在网上搜索了有关在 python 中处理并发的不同方法,我遇到了多处理库、concurrenct.futures 的 ProcessPoolExecutor() 和 ThreadPoolExecutor() 以及 asyncio。让我困惑的是这些库之间的区别。特别是 multiprocessing 库的作用,因为它有像 pool.apply_async 这样的方法,它是否也做 asyncio 的工作?如果是这样,当它是从 asyncio 实现并发性的不同方法(多进程与协作多任务)时,为什么将其称为多处理?

python multithreading multiprocessing python-asyncio

16
推荐指数
1
解决办法
2618
查看次数

在固定装置中包含代码后,使用 asyncio 进行 pytest 时出现 AttributeError

我需要测试我的电报机器人。为此,我需要创建客户端用户来询问我的机器人。我找到了可以做到这一点的电视马拉松图书馆。首先,我编写了一个代码示例以确保授权和连接正常工作并向自己发送测试消息(省略导入):

api_id = int(os.getenv("TELEGRAM_APP_ID"))
api_hash = os.getenv("TELEGRAM_APP_HASH")
session_str = os.getenv("TELETHON_SESSION")

async def main():
    client = TelegramClient(
        StringSession(session_str), api_id, api_hash,
        sequential_updates=True
    )
    await client.connect()
    async with client.conversation("@someuser") as conv:
        await conv.send_message('Hey, what is your name?')


if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

@someuser(我)成功收到消息。好的,现在我根据上面的代码使用固定装置创建一个测试:

api_id = int(os.getenv("TELEGRAM_APP_ID"))
api_hash = os.getenv("TELEGRAM_APP_HASH")
session_str = os.getenv("TELETHON_SESSION")

@pytest.fixture(scope="session")
async def client():
    client = TelegramClient(
        StringSession(session_str), api_id, api_hash,
        sequential_updates=True
    )
    await client.connect()
    yield client
    await client.disconnect()


@pytest.mark.asyncio
async def test_start(client: TelegramClient):
    async with client.conversation("@someuser") as conv:
        await …
Run Code Online (Sandbox Code Playgroud)

python pytest python-asyncio telethon

16
推荐指数
2
解决办法
1万
查看次数

为什么在使用带有asyncio的协同程序的列表解析时会得到不同的结果?

我最初有一些代码将结果汇总到一个列表中.当我重构此代码以使用列表理解时,我得到了意想不到的结果:

import asyncio

@asyncio.coroutine
def coro():
    return "foo"


# Writing the code without a list comp works,
# even with an asyncio.sleep(0.1).
@asyncio.coroutine
def good():
    yield from asyncio.sleep(0.1)
    result = []
    for i in range(3):
        current = yield from coro()
        result.append(current)
    return result


# Using a list comp without an async.sleep(0.1)
# works.
@asyncio.coroutine
def still_good():
    return [(yield from coro()) for i in range(3)]


# Using a list comp along with an asyncio.sleep(0.1)
# does _not_ work.
@asyncio.coroutine
def huh(): …
Run Code Online (Sandbox Code Playgroud)

python python-asyncio

15
推荐指数
1
解决办法
709
查看次数

如何使用asyncio添加连接超时?

我想非常快地连接到很多不同网站的列表.我使用asyncio以异步方式执行此操作,现在想要添加超时,以便在需要花费太长时间来响应时忽略连接.

我该如何实现?

import ssl
import asyncio
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor
import time


@asyncio.coroutine
def run():
    while True:
        host = yield from q.get()
        if not host:
            break

        with suppress(ssl.CertificateError):
            reader, writer = yield from asyncio.open_connection(host[1], 443, ssl=True) #timout option?
            reader.close()
            writer.close()


@asyncio.coroutine
def load_q():
    # only 3 entries for debugging reasons
    for host in [[1, 'python.org'], [2, 'qq.com'], [3, 'google.com']]:
        yield from q.put(host)
    for _ in range(NUM):
        q.put(None)


if __name__ == "__main__":
    NUM = 1000
    q = …
Run Code Online (Sandbox Code Playgroud)

python asynchronous timeout python-3.x python-asyncio

15
推荐指数
1
解决办法
1万
查看次数

asyncio队列消费者协同程序

我有一个asyncio.Protocol从服务器接收数据的子类.我正在存储这些数据(每行,因为数据是文本)asyncio.Queue.

import asyncio

q = asyncio.Queue()

class StreamProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        for message in data.decode().splitlines():
            yield q.put(message.rstrip())

    def connection_lost(self, exc):
        self.loop.stop()

loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
                              '127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
Run Code Online (Sandbox Code Playgroud)

我想让另一个协程负责消耗队列中的数据并进行处理.

  • 这应该是一个asyncio.Task吗?
  • 如果队列变空是因为几秒钟没有收到数据怎么办?如何确保我的消费者不会停止(run_until_complete)?
  • 有没有比使用全局变量更简洁的方法?

python coroutine python-3.x python-asyncio

15
推荐指数
1
解决办法
1万
查看次数

为什么我不能从'异步函数内'中获益?

在Python 3.6中,我可以yield在协程中使用.但是我无法使用yield from.

以下是我的代码.在第3行,我等待另一个协程.在第4行,我尝试yield from一个文件.为什么Python 3.6不允许我这样做?

async def read_file(self, filename):
    with tempfile.NamedTemporaryFile(mode='r', delete=True, dir='/tmp', prefix='sftp') as tmp_file:
        await self.copy_file(filename, tmp_file)
        yield from open(tmp_file)
Run Code Online (Sandbox Code Playgroud)

以下是Python 3.6为上述代码引发的异常:

  File "example.py", line 4
    yield from open(tmp_file)
    ^
SyntaxError: 'yield from' inside async function
Run Code Online (Sandbox Code Playgroud)

python python-3.x async-await python-asyncio python-3.6

15
推荐指数
1
解决办法
3116
查看次数

部分异步函数未被检测为异步

我有一个函数,它接受常规和异步函数(不是协程,而是返回协程的函数)。

它在内部使用asyncio.iscoroutinefunction() test来查看它获得了哪种类型的功能。

最近,当我尝试创建部分异步函数时,它崩溃了。

在这个演示中,ptest 被识别为一个协程函数,即使它返回一个协程,即ptest() 一个协程。

import asyncio
import functools

async def test(arg): pass
print(asyncio.iscoroutinefunction(test))    # True

ptest = functools.partial(test, None)
print(asyncio.iscoroutinefunction(ptest))   # False!!

print(asyncio.iscoroutine(ptest()))         # True
Run Code Online (Sandbox Code Playgroud)

问题原因很清楚,但解决方案却不是。

如何动态创建通过测试的部分异步函数?

或者

如何测试包裹在部分对象中的 func ?

任何一个答案都可以解决问题。

python python-asyncio

15
推荐指数
2
解决办法
5364
查看次数

asyncio.Semaphore RuntimeError: Task got Future 附加到不同的循环

当我在 Python 3.7 中运行此代码时:

import asyncio

sem = asyncio.Semaphore(2)

async def work():
    async with sem:
        print('working')
        await asyncio.sleep(1)

async def main():
    await asyncio.gather(work(), work(), work())

asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

它因运行时错误而失败:

$ python3 demo.py
working
working
Traceback (most recent call last):
  File "demo.py", line 13, in <module>
    asyncio.run(main())
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "demo.py", line 11, in main
    await asyncio.gather(work(), work(), work())
  File "demo.py", line 6, in work
    async with sem:
  File …
Run Code Online (Sandbox Code Playgroud)

python semaphore python-3.x python-asyncio

15
推荐指数
1
解决办法
3575
查看次数