假设我有这样的代码
async def fetch_text() -> str:
return "text "
async def show_something():
something = await fetch_text()
print(something)
Run Code Online (Sandbox Code Playgroud)
这很好。但后来我想清理数据,所以我做
async def fetch_text() -> str:
return "text "
def fetch_clean_text(text: str) -> str:
text = await fetch_text()
return text.strip(text)
async def show_something():
something = fetch_clean_text()
print(something)
Run Code Online (Sandbox Code Playgroud)
(我可以清理里面的文本show_something(),但让我们假设show_something()可以打印很多东西并且不知道或不应该知道清理它们的正确方法。)
这当然是一个SyntaxError: 'await' outside async function. 但是——如果这段代码可以运行——当await表达式没有放在协程函数中时,它会在协程函数的上下文中执行。为什么不允许这种行为?
我在这个设计中看到了一位专业人士;在我的后一个例子中,你看不到show_something()的身体正在做一些可能导致其暂停的事情。但是如果我要创建fetch_clean_text()一个协程,它不仅会使事情复杂化,而且可能还会降低性能。拥有另一个本身不执行任何 I/O 的协程是没有意义的。有没有更好的办法?
如果 的一个任务gather引发异常,其他任务仍然可以继续。
嗯,这不完全是我需要的。我想区分致命的错误和需要取消所有剩余任务的错误,以及不是而是应该记录的错误,同时允许其他任务继续。
这是我实现这一点的失败尝试:
from asyncio import gather, get_event_loop, sleep
class ErrorThatShouldCancelOtherTasks(Exception):
pass
async def my_sleep(secs):
await sleep(secs)
if secs == 5:
raise ErrorThatShouldCancelOtherTasks('5 is forbidden!')
print(f'Slept for {secs}secs.')
async def main():
try:
sleepers = gather(*[my_sleep(secs) for secs in [2, 5, 7]])
await sleepers
except ErrorThatShouldCancelOtherTasks:
print('Fatal error; cancelling')
sleepers.cancel()
finally:
await sleep(5)
get_event_loop().run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
(finally await sleep这里是为了防止解释器立即关闭,这会自行取消所有任务)
奇怪的是,呼吁cancel在gather实际上并没有取消它!
PS C:\Users\m> .\AppData\Local\Programs\Python\Python368\python.exe .\wtf.py
Slept for 2secs.
Fatal error; cancelling
Slept for 7secs.
Run Code Online (Sandbox Code Playgroud)
我对这种行为感到非常惊讶,因为它似乎 …
作为使用并发的新手,我对何时使用不同的 Python 并发库感到困惑。据我了解,多处理、多线程和异步编程是并发的一部分,而多处理是称为并行的并发子集的一部分。
我在网上搜索了有关在 python 中处理并发的不同方法,我遇到了多处理库、concurrenct.futures 的 ProcessPoolExecutor() 和 ThreadPoolExecutor() 以及 asyncio。让我困惑的是这些库之间的区别。特别是 multiprocessing 库的作用,因为它有像 pool.apply_async 这样的方法,它是否也做 asyncio 的工作?如果是这样,当它是从 asyncio 实现并发性的不同方法(多进程与协作多任务)时,为什么将其称为多处理?
我需要测试我的电报机器人。为此,我需要创建客户端用户来询问我的机器人。我找到了可以做到这一点的电视马拉松图书馆。首先,我编写了一个代码示例以确保授权和连接正常工作并向自己发送测试消息(省略导入):
api_id = int(os.getenv("TELEGRAM_APP_ID"))
api_hash = os.getenv("TELEGRAM_APP_HASH")
session_str = os.getenv("TELETHON_SESSION")
async def main():
client = TelegramClient(
StringSession(session_str), api_id, api_hash,
sequential_updates=True
)
await client.connect()
async with client.conversation("@someuser") as conv:
await conv.send_message('Hey, what is your name?')
if __name__ == "__main__":
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
@someuser(我)成功收到消息。好的,现在我根据上面的代码使用固定装置创建一个测试:
api_id = int(os.getenv("TELEGRAM_APP_ID"))
api_hash = os.getenv("TELEGRAM_APP_HASH")
session_str = os.getenv("TELETHON_SESSION")
@pytest.fixture(scope="session")
async def client():
client = TelegramClient(
StringSession(session_str), api_id, api_hash,
sequential_updates=True
)
await client.connect()
yield client
await client.disconnect()
@pytest.mark.asyncio
async def test_start(client: TelegramClient):
async with client.conversation("@someuser") as conv:
await …Run Code Online (Sandbox Code Playgroud) 我最初有一些代码将结果汇总到一个列表中.当我重构此代码以使用列表理解时,我得到了意想不到的结果:
import asyncio
@asyncio.coroutine
def coro():
return "foo"
# Writing the code without a list comp works,
# even with an asyncio.sleep(0.1).
@asyncio.coroutine
def good():
yield from asyncio.sleep(0.1)
result = []
for i in range(3):
current = yield from coro()
result.append(current)
return result
# Using a list comp without an async.sleep(0.1)
# works.
@asyncio.coroutine
def still_good():
return [(yield from coro()) for i in range(3)]
# Using a list comp along with an asyncio.sleep(0.1)
# does _not_ work.
@asyncio.coroutine
def huh(): …Run Code Online (Sandbox Code Playgroud) 我想非常快地连接到很多不同网站的列表.我使用asyncio以异步方式执行此操作,现在想要添加超时,以便在需要花费太长时间来响应时忽略连接.
我该如何实现?
import ssl
import asyncio
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor
import time
@asyncio.coroutine
def run():
while True:
host = yield from q.get()
if not host:
break
with suppress(ssl.CertificateError):
reader, writer = yield from asyncio.open_connection(host[1], 443, ssl=True) #timout option?
reader.close()
writer.close()
@asyncio.coroutine
def load_q():
# only 3 entries for debugging reasons
for host in [[1, 'python.org'], [2, 'qq.com'], [3, 'google.com']]:
yield from q.put(host)
for _ in range(NUM):
q.put(None)
if __name__ == "__main__":
NUM = 1000
q = …Run Code Online (Sandbox Code Playgroud) 我有一个asyncio.Protocol从服务器接收数据的子类.我正在存储这些数据(每行,因为数据是文本)asyncio.Queue.
import asyncio
q = asyncio.Queue()
class StreamProtocol(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
for message in data.decode().splitlines():
yield q.put(message.rstrip())
def connection_lost(self, exc):
self.loop.stop()
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
'127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
Run Code Online (Sandbox Code Playgroud)
我想让另一个协程负责消耗队列中的数据并进行处理.
asyncio.Task吗?run_until_complete)?在Python 3.6中,我可以yield在协程中使用.但是我无法使用yield from.
以下是我的代码.在第3行,我等待另一个协程.在第4行,我尝试yield from一个文件.为什么Python 3.6不允许我这样做?
async def read_file(self, filename):
with tempfile.NamedTemporaryFile(mode='r', delete=True, dir='/tmp', prefix='sftp') as tmp_file:
await self.copy_file(filename, tmp_file)
yield from open(tmp_file)
Run Code Online (Sandbox Code Playgroud)
以下是Python 3.6为上述代码引发的异常:
File "example.py", line 4
yield from open(tmp_file)
^
SyntaxError: 'yield from' inside async function
Run Code Online (Sandbox Code Playgroud) 我有一个函数,它接受常规和异步函数(不是协程,而是返回协程的函数)。
它在内部使用asyncio.iscoroutinefunction() test来查看它获得了哪种类型的功能。
最近,当我尝试创建部分异步函数时,它崩溃了。
在这个演示中,ptest 不被识别为一个协程函数,即使它返回一个协程,即ptest() 是一个协程。
import asyncio
import functools
async def test(arg): pass
print(asyncio.iscoroutinefunction(test)) # True
ptest = functools.partial(test, None)
print(asyncio.iscoroutinefunction(ptest)) # False!!
print(asyncio.iscoroutine(ptest())) # True
Run Code Online (Sandbox Code Playgroud)
问题原因很清楚,但解决方案却不是。
如何动态创建通过测试的部分异步函数?
或者
如何测试包裹在部分对象中的 func ?
任何一个答案都可以解决问题。
当我在 Python 3.7 中运行此代码时:
import asyncio
sem = asyncio.Semaphore(2)
async def work():
async with sem:
print('working')
await asyncio.sleep(1)
async def main():
await asyncio.gather(work(), work(), work())
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
它因运行时错误而失败:
$ python3 demo.py
working
working
Traceback (most recent call last):
File "demo.py", line 13, in <module>
asyncio.run(main())
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "demo.py", line 11, in main
await asyncio.gather(work(), work(), work())
File "demo.py", line 6, in work
async with sem:
File …Run Code Online (Sandbox Code Playgroud) python ×10
python-asyncio ×10
python-3.x ×5
async-await ×1
asynchronous ×1
cancellation ×1
coroutine ×1
exception ×1
pytest ×1
python-3.6 ×1
semaphore ×1
telethon ×1
timeout ×1