标签: aiohttp

asyncio.Queue() 的目的是什么以及如何正确使用它?

我已经熟悉Python,但我刚刚开始学习“异步编程”和“io任务”主题。在教程中,我看到它asyncio.Queue()与生产者和消费者相关的东西一起使用,但我不太理解它。

有人可以简化一下,并举几个例子吗?

python-3.x python-asyncio aiohttp

3
推荐指数
1
解决办法
6618
查看次数

有效使用多个 Asyncio 队列

我目前正在构建一个项目,需要向各个端点发出多个请求。我将这些请求包装在 Aiohttp 中以允许异步。

问题:
我有三个队列:queue1queue2queue3。此外,我还有三个工作函数(worker1worker2worker3),它们与各自的队列关联。第一个队列立即填充运行前已知的列表 ID。当请求完成并将数据提交到数据库时,它将 ID 传递给queue2。Aworker2将获取此 ID 并请求更多数据。根据这些数据,它将开始生成 ID 列表(与 中的 ID 不同)queue1/queue2worker2会将 ID 放入 中queue3。最后,在提交到数据库之前,worker3将从中获取此 ID并请求更多数据。queue3

问题queue.join()是由于阻塞调用而出现的。每个工作线程都绑定到一个单独的队列,因此连接queue1将阻塞,直到完成。这很好,但它也违背了使用异步的目的。如果不使用join()该程序,则无法检测队列何时完全为空。另一个问题是,当其中一个队列为空但仍有数据尚未添加时,可能会出现静默错误。

基本代码概要如下:

queue1 = asyncio.Queue()
queue2 = asyncio.Queue()
queue3 = asyncio.Queue()

async with aiohttp.ClientSession() as session:
    for i in range(3):
        tasks.append(asyncio.create_task(worker1(queue1)))

    for i in range(3):
        tasks.append(asyncio.create_task(worker2(queue2)))

    for i in range(10): …
Run Code Online (Sandbox Code Playgroud)

python-3.x python-asyncio aiohttp

3
推荐指数
1
解决办法
2594
查看次数

使用 http 代理的 aiohttp https 请求失败

我正在构建一个测试项目来学习 asyncio。我正在构建一个通过代理服务器获取多个页面的程序。

它对于 http 页面工作正常,但对于 https 页面则失败。当我使用常规请求库时,我也可以使用 https 页面,但不能使用 asyncio。

我隔离了损坏的代码:

import aiohttp
import asyncio

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url, proxy="http://192.168.0.2:9001") as response:
            print(await response.text())

loop = asyncio.get_event_loop()
loop.run_until_complete(fetch("https://google.com")) #http websites do work
Run Code Online (Sandbox Code Playgroud)

错误:

Traceback (most recent call last):
  File "C:\Users\GG\AppData\Local\Programs\Python\Python38-32\lib\site-packages\aiohttp\connector.py", line 936, in _wrap_create_connection
    return await self._loop.create_connection(*args, **kwargs)  # type: ignore  # noqa
  File "C:\Users\GG\AppData\Local\Programs\Python\Python38-32\lib\asyncio\base_events.py", line 1050, in create_connection
    transport, protocol = await self._create_connection_transport(
  File "C:\Users\GG\AppData\Local\Programs\Python\Python38-32\lib\asyncio\base_events.py", line 1080, in _create_connection_transport
    await waiter
  File …
Run Code Online (Sandbox Code Playgroud)

python proxy python-asyncio aiohttp

3
推荐指数
1
解决办法
3498
查看次数

如何asyncio.gather成块的任务+使用具有TCP连接限制的信号量?

我有一个大型 (1M) 数据库结果集,我想为每一行调用 REST API。

API 可以接受批量请求,但我不确定如何对rows生成器进行切片,以便每个任务处理一个行列表,例如 10 行。我宁愿不预先读取所有行并坚持使用生成器。

在一个 http 请求中发送my_function一个列表是很容易的,但是呢asyncio.gather?也许其中之一itertools可以提供帮助。

请参阅下面的通用伪代码进行说明:

async def main(rows):
    async with aiohttp.ClientSession() as session:
        tasks = [my_function(row, session) for row in rows]
        return await asyncio.gather(*tasks)

rows = <generator of database rows>
results = asyncio.run(main(rows))
Run Code Online (Sandbox Code Playgroud)

注意:它们results很小,基本上是每行的确认值。

顺便说一句,

  • asyncio.gather()可以(有效)处理的任务数量是否有限制?
  • 当前gather()将所有请求/任务加载到内存中,消耗 50GB(!)。如何即时读取和传递行和任务以减少内存使用?这是asyncio.BoundedSemaphore()用来做什么的吗?
  • TCP 连接限制为 500,因为 REST Web 服务器可以接受这个数量。如果信号量发挥作用,该值应该是多少,即设置信号量> TCP连接限制是否有意义?

aiohttp很棒asyncio但很难理解 - 我同意这篇文章

asyncio 一直在变化,所以要警惕旧的 Stack Overflow …

python-asyncio aiohttp

3
推荐指数
1
解决办法
6347
查看次数

为什么需要重新引发 asyncio.CancelledError ?

我有以下aiohttpWebSocket 处理程序:

async def websocket_handler(request):
  ws = None
  
  if 'my-websocket-clients' not in request.app:
    request.app['my-websocket-clients'] = []
  
  print('Websocket connection starting', request.path, request.query)
  
  try:
    ws = aiohttp.web.WebSocketResponse(autoping=True, heartbeat=10.0, compress=True)
    await ws.prepare(request)
    request.app['my-websocket-clients'].append(ws)
    print('Websocket connection ready', len(request.app['my-websocket-clients']))
    async for msg in ws:
      await websocket_message(request, ws, msg)
  except asyncio.exceptions.CancelledError as e:
    print('Websocket connection was closed uncleanly ' + str(e))
    # ~~~~~~ re-raise here? ~~~~~~
  except:
    traceback.print_exc()
  finally:
    try:
      await ws.close()
    except:
      traceback.print_exc()
  
  if ws in request.app['my-websocket-clients']:
    request.app['my-websocket-clients'].remove(ws)
  
  print('Websocket connection closed', len(request.app['my-websocket-clients']))
  
  if …
Run Code Online (Sandbox Code Playgroud)

python exception websocket python-asyncio aiohttp

3
推荐指数
1
解决办法
2101
查看次数

当 __aenter__() 使用“async with”失败时会发生什么?

PEP 492提到:

async with EXPR as VAR:
    BLOCK
Run Code Online (Sandbox Code Playgroud)

在语义上等同于:

mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__

VAR = await aenter(mgr)
try:
    BLOCK
except:
    if not await aexit(mgr, *sys.exc_info()):
        raise
else:
    await aexit(mgr, None, None, None)
Run Code Online (Sandbox Code Playgroud)

然而,VAR = await aenter(mgr)它不在try区块中,所以我想知道是否__aenter__()允许失败。

例如,在此aiohttp片段中(摘自入门):

import aiohttp
import asyncio

async def main():

    async with aiohttp.ClientSession() as session:
        async with session.get('http://python.org') as response:

            print("Status:", response.status)
            print("Content-type:", response.headers['content-type'])

            html = await response.text()
            print("Body:", html[:15], …
Run Code Online (Sandbox Code Playgroud)

python python-3.x python-asyncio semantics aiohttp

3
推荐指数
1
解决办法
3551
查看次数

如何使用 session.get() 将标头传递给异步

我想知道如何在以下 get 调用中传递标头

headers = {
'User-Agent': 'Mozilla'
}
async def fetch(url, session):
async with session.get(url) as response:
    resp = await response.read()
    return resp
Run Code Online (Sandbox Code Playgroud)

我尝试了以下但没有得到任何回应。

headers = {
'User-Agent': 'Mozilla'
}
async def fetch(url, session):
async with session.get(url, headers=headers) as response:
    resp = await response.read()
    return resp
Run Code Online (Sandbox Code Playgroud)

目的是以异步模式调用不同的 url。需要知道是否还有其他替代方法,但无论如何,都需要传递标头才能获得正确的响应。

python-asyncio aiohttp requests-futures

3
推荐指数
1
解决办法
6203
查看次数

如何使用 asyncio/aiohttp 确定最佳缓冲区大小

在 python 中使用 asyncio 时,我们如何确定 read() 的最佳参数?12字节?100 字节?

async with self._session.get(url, headers=headers) as response:
    chunk_size = 12
    result = ''

    while True:
       chunk = await response.content.read(chunk_size)
          if not chunk:
              break
          elif isinstance(chunk, (bytes, bytearray)):
              data = chunk.decode('utf8')
               result += data
Run Code Online (Sandbox Code Playgroud)

python python-asyncio aiohttp

3
推荐指数
1
解决办法
2350
查看次数

Aiohttp 在浏览器正常打开的某些网站上引发证书错误

代码示例

import aiohttp
import asyncio


async def main(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print("Status:", response.status)
            print("Content-type:", response.headers['content-type'])
            html = await response.text()
            print("Body:", html[:15], "...")


url = "https://shikimori.one/"

loop = asyncio.get_event_loop()
loop.run_until_complete(main(url))
Run Code Online (Sandbox Code Playgroud)

追溯

    Traceback (most recent call last):
  File "D:\projects\parser\test\test_aiohttp.py", line 20, in <module>
    loop.run_until_complete(main(url))
  File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\asyncio\base_events.py", line 642, in run_until_complete
    return future.result()
  File "D:\projects\parser\test\test_aiohttp.py", line 8, in main
    async with session.get(url) as response:
  File "D:\projects\parser\venv\lib\site-packages\aiohttp\client.py", line 1117, in __aenter__
    self._resp = await self._coro
  File "D:\projects\parser\venv\lib\site-packages\aiohttp\client.py", …
Run Code Online (Sandbox Code Playgroud)

python ssl-certificate aiohttp

3
推荐指数
1
解决办法
1万
查看次数

Python - 同时运行多个异步函数

我本质上是在制作一个 pinger,它有一个密钥/webhook 对的二维列表,并在 ping 一个密钥后,将响应发送到 webhook

二维列表如下:

some_list = [["key1", "webhook1"], ["key2", "webhook2"]]
Run Code Online (Sandbox Code Playgroud)

我的程序本质上是一个循环,我不太确定如何some_list在函数中旋转数据。

这是我的脚本的一个小演示:

some_list = [["key1", "webhook1"], ["key2", "webhook2"]]
Run Code Online (Sandbox Code Playgroud)

我试过了:

async def do_ping(some_pair):
    async with aiohttps.ClientSession() as s:
        tasks = await gen_tasks(s, some_pair)
        results = await asyncio.gather(*tasks*)
        sleep(10)
        await do_ping(some_pair)
Run Code Online (Sandbox Code Playgroud)

但由于该do_ping函数是一个自调用循环,它只是一遍又一遍地调用第一个函数,而永远不会调用后面的函数。希望找到解决方案,无论是线程还是类似的,如果您有更好的构造some_list值的方法(我认为是字典),也请随意放弃该反馈

python python-requests python-asyncio aiohttp

3
推荐指数
1
解决办法
8349
查看次数