我已经熟悉Python,但我刚刚开始学习“异步编程”和“io任务”主题。在教程中,我看到它asyncio.Queue()与生产者和消费者相关的东西一起使用,但我不太理解它。
有人可以简化一下,并举几个例子吗?
我目前正在构建一个项目,需要向各个端点发出多个请求。我将这些请求包装在 Aiohttp 中以允许异步。
问题:
我有三个队列:queue1、queue2和queue3。此外,我还有三个工作函数(worker1、worker2、worker3),它们与各自的队列关联。第一个队列立即填充运行前已知的列表 ID。当请求完成并将数据提交到数据库时,它将 ID 传递给queue2。Aworker2将获取此 ID 并请求更多数据。根据这些数据,它将开始生成 ID 列表(与 中的 ID 不同)queue1/queue2。worker2会将 ID 放入 中queue3。最后,在提交到数据库之前,worker3将从中获取此 ID并请求更多数据。queue3
问题queue.join()是由于阻塞调用而出现的。每个工作线程都绑定到一个单独的队列,因此连接queue1将阻塞,直到完成。这很好,但它也违背了使用异步的目的。如果不使用join()该程序,则无法检测队列何时完全为空。另一个问题是,当其中一个队列为空但仍有数据尚未添加时,可能会出现静默错误。
基本代码概要如下:
queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()
async with aiohttp.ClientSession() as session:
for i in range(3):
tasks.append(asyncio.create_task(worker1(queue1)))
for i in range(3):
tasks.append(asyncio.create_task(worker2(queue2)))
for i in range(10): …Run Code Online (Sandbox Code Playgroud) 我正在构建一个测试项目来学习 asyncio。我正在构建一个通过代理服务器获取多个页面的程序。
它对于 http 页面工作正常,但对于 https 页面则失败。当我使用常规请求库时,我也可以使用 https 页面,但不能使用 asyncio。
我隔离了损坏的代码:
import aiohttp
import asyncio
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url, proxy="http://192.168.0.2:9001") as response:
print(await response.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch("https://google.com")) #http websites do work
Run Code Online (Sandbox Code Playgroud)
错误:
Traceback (most recent call last):
File "C:\Users\GG\AppData\Local\Programs\Python\Python38-32\lib\site-packages\aiohttp\connector.py", line 936, in _wrap_create_connection
return await self._loop.create_connection(*args, **kwargs) # type: ignore # noqa
File "C:\Users\GG\AppData\Local\Programs\Python\Python38-32\lib\asyncio\base_events.py", line 1050, in create_connection
transport, protocol = await self._create_connection_transport(
File "C:\Users\GG\AppData\Local\Programs\Python\Python38-32\lib\asyncio\base_events.py", line 1080, in _create_connection_transport
await waiter
File …Run Code Online (Sandbox Code Playgroud) 我有一个大型 (1M) 数据库结果集,我想为每一行调用 REST API。
API 可以接受批量请求,但我不确定如何对rows生成器进行切片,以便每个任务处理一个行列表,例如 10 行。我宁愿不预先读取所有行并坚持使用生成器。
在一个 http 请求中发送my_function一个列表是很容易的,但是呢asyncio.gather?也许其中之一itertools可以提供帮助。
请参阅下面的通用伪代码进行说明:
async def main(rows):
async with aiohttp.ClientSession() as session:
tasks = [my_function(row, session) for row in rows]
return await asyncio.gather(*tasks)
rows = <generator of database rows>
results = asyncio.run(main(rows))
Run Code Online (Sandbox Code Playgroud)
注意:它们results很小,基本上是每行的确认值。
顺便说一句,
asyncio.gather()可以(有效)处理的任务数量是否有限制?gather()将所有请求/任务加载到内存中,消耗 50GB(!)。如何即时读取和传递行和任务以减少内存使用?这是asyncio.BoundedSemaphore()用来做什么的吗?aiohttp很棒asyncio但很难理解 - 我同意这篇文章:
asyncio 一直在变化,所以要警惕旧的 Stack Overflow …
我有以下aiohttpWebSocket 处理程序:
async def websocket_handler(request):
ws = None
if 'my-websocket-clients' not in request.app:
request.app['my-websocket-clients'] = []
print('Websocket connection starting', request.path, request.query)
try:
ws = aiohttp.web.WebSocketResponse(autoping=True, heartbeat=10.0, compress=True)
await ws.prepare(request)
request.app['my-websocket-clients'].append(ws)
print('Websocket connection ready', len(request.app['my-websocket-clients']))
async for msg in ws:
await websocket_message(request, ws, msg)
except asyncio.exceptions.CancelledError as e:
print('Websocket connection was closed uncleanly ' + str(e))
# ~~~~~~ re-raise here? ~~~~~~
except:
traceback.print_exc()
finally:
try:
await ws.close()
except:
traceback.print_exc()
if ws in request.app['my-websocket-clients']:
request.app['my-websocket-clients'].remove(ws)
print('Websocket connection closed', len(request.app['my-websocket-clients']))
if …Run Code Online (Sandbox Code Playgroud) PEP 492提到:
async with EXPR as VAR:
BLOCK
Run Code Online (Sandbox Code Playgroud)
在语义上等同于:
mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__
VAR = await aenter(mgr)
try:
BLOCK
except:
if not await aexit(mgr, *sys.exc_info()):
raise
else:
await aexit(mgr, None, None, None)
Run Code Online (Sandbox Code Playgroud)
然而,VAR = await aenter(mgr)它不在try区块中,所以我想知道是否__aenter__()允许失败。
例如,在此aiohttp片段中(摘自入门):
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://python.org') as response:
print("Status:", response.status)
print("Content-type:", response.headers['content-type'])
html = await response.text()
print("Body:", html[:15], …Run Code Online (Sandbox Code Playgroud) 我想知道如何在以下 get 调用中传递标头
headers = {
'User-Agent': 'Mozilla'
}
async def fetch(url, session):
async with session.get(url) as response:
resp = await response.read()
return resp
Run Code Online (Sandbox Code Playgroud)
我尝试了以下但没有得到任何回应。
headers = {
'User-Agent': 'Mozilla'
}
async def fetch(url, session):
async with session.get(url, headers=headers) as response:
resp = await response.read()
return resp
Run Code Online (Sandbox Code Playgroud)
目的是以异步模式调用不同的 url。需要知道是否还有其他替代方法,但无论如何,都需要传递标头才能获得正确的响应。
在 python 中使用 asyncio 时,我们如何确定 read() 的最佳参数?12字节?100 字节?
async with self._session.get(url, headers=headers) as response:
chunk_size = 12
result = ''
while True:
chunk = await response.content.read(chunk_size)
if not chunk:
break
elif isinstance(chunk, (bytes, bytearray)):
data = chunk.decode('utf8')
result += data
Run Code Online (Sandbox Code Playgroud) 代码示例
import aiohttp
import asyncio
async def main(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print("Status:", response.status)
print("Content-type:", response.headers['content-type'])
html = await response.text()
print("Body:", html[:15], "...")
url = "https://shikimori.one/"
loop = asyncio.get_event_loop()
loop.run_until_complete(main(url))
Run Code Online (Sandbox Code Playgroud)
追溯
Traceback (most recent call last):
File "D:\projects\parser\test\test_aiohttp.py", line 20, in <module>
loop.run_until_complete(main(url))
File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 642, in run_until_complete
return future.result()
File "D:\projects\parser\test\test_aiohttp.py", line 8, in main
async with session.get(url) as response:
File "D:\projects\parser\venv\lib\site-packages\aiohttp\client.py", line 1117, in __aenter__
self._resp = await self._coro
File "D:\projects\parser\venv\lib\site-packages\aiohttp\client.py", …Run Code Online (Sandbox Code Playgroud) 我本质上是在制作一个 pinger,它有一个密钥/webhook 对的二维列表,并在 ping 一个密钥后,将响应发送到 webhook
二维列表如下:
some_list = [["key1", "webhook1"], ["key2", "webhook2"]]
Run Code Online (Sandbox Code Playgroud)
我的程序本质上是一个循环,我不太确定如何some_list在函数中旋转数据。
这是我的脚本的一个小演示:
some_list = [["key1", "webhook1"], ["key2", "webhook2"]]
Run Code Online (Sandbox Code Playgroud)
我试过了:
async def do_ping(some_pair):
async with aiohttps.ClientSession() as s:
tasks = await gen_tasks(s, some_pair)
results = await asyncio.gather(*tasks*)
sleep(10)
await do_ping(some_pair)
Run Code Online (Sandbox Code Playgroud)
但由于该do_ping函数是一个自调用循环,它只是一遍又一遍地调用第一个函数,而永远不会调用后面的函数。希望找到解决方案,无论是线程还是类似的,如果您有更好的构造some_list值的方法(我认为是字典),也请随意放弃该反馈