Ami*_*mar 8 python asynchronous python-3.x python-asyncio aiohttp
我想从网站下载/抓取 5000 万条日志记录。我没有一次性下载 5000 万个,而是尝试使用以下代码一次下载 1000 万个,但它一次只能处理 20,000 个(超过这个数量会引发错误),因此它变得非常耗时下载那么多数据。目前下载20000条记录的速度需要3-4分钟,100%|\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88\xe2\x96\x88| 20000/20000 [03:48<00:00, 87.41it/s]那么如何加速呢?
import asyncio\nimport aiohttp\nimport time\nimport tqdm\nimport nest_asyncio\n\nnest_asyncio.apply()\n\n\nasync def make_numbers(numbers, _numbers):\n for i in range(numbers, _numbers):\n yield i\n\n\nn = 0\nq = 10000000\n\n\nasync def fetch():\n # example\n url = "https://httpbin.org/anything/log?id="\n\n async with aiohttp.ClientSession() as session:\n post_tasks = []\n # prepare the coroutines that poat\n async for x in make_numbers(n, q):\n post_tasks.append(do_get(session, url, x))\n # now execute them all at once\n\n responses = [await f for f in tqdm.tqdm(asyncio.as_completed(post_tasks), total=len(post_tasks))]\n\n\nasync def do_get(session, url, x):\n headers = {\n 'Content-Type': "application/x-www-form-urlencoded",\n 'Access-Control-Allow-Origin': "*",\n 'Accept-Encoding': "gzip, deflate",\n 'Accept-Language': "en-US"\n }\n\n async with session.get(url + str(x), headers=headers) as response:\n data = await response.text()\n print(data)\n\n\ns = time.perf_counter()\ntry:\n loop = asyncio.get_event_loop()\n loop.run_until_complete(fetch())\nexcept:\n print("error")\n\nelapsed = time.perf_counter() - s\n# print(f"{__file__} executed in {elapsed:0.2f} seconds.")\nRun Code Online (Sandbox Code Playgroud)\n回溯(最近一次调用最后一次):
\nFile "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 986, in _wrap_create_connection\n return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\base_events.py", line 1056, in create_connection\n raise exceptions[0]\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\base_events.py", line 1041, in create_connection\n sock = await self._connect_sock(\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\base_events.py", line 955, in _connect_sock\n await self.sock_connect(sock, address)\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\proactor_events.py", line 702, in sock_connect\n return await self._proactor.connect(sock, address)\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\tasks.py", line 328, in __wakeup\n future.result()\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\windows_events.py", line 812, in _poll\n value = callback(transferred, key, ov)\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\windows_events.py", line 599, in finish_connect\n ov.getresult()\nOSError: [WinError 121] The semaphore timeout period has expired\n\nThe above exception was the direct cause of the following exception:\n\nTraceback (most recent call last):\n File "C:\\Users\\SGM\\Desktop\\xnet\\x3stackoverflow.py", line 136, in <module>\n loop.run_until_complete(fetch())\n File "C:\\Users\\SGM\\AppData\\Roaming\\Python\\Python39\\site-packages\\nest_asyncio.py", line 81, in run_until_complete\n return f.result()\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\futures.py", line 201, in result\n raise self._exception\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\tasks.py", line 256, in __step\n result = coro.send(None)\n File "C:\\Users\\SGM\\Desktop\\xnet\\x3stackoverflow.py", line 88, in fetch\n response = await f\n File "C:\\Users\\SGM\\Desktop\\xnet\\x3stackoverflow.py", line 37, in _wait_for_one\n return f.result()\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\futures.py", line 201, in result\n raise self._exception\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\asyncio\\tasks.py", line 258, in __step\n result = coro.throw(exc)\n File "C:\\Users\\SGM\\Desktop\\xnet\\x3stackoverflow.py", line 125, in do_get\n async with session.get(url + str(x), headers=headers) as response:\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\client.py", line 1138, in __aenter__\n self._resp = await self._coro\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\client.py", line 535, in _request\n conn = await self._connector.connect(\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 542, in connect\n proto = await self._create_connection(req, traces, timeout)\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 907, in _create_connection\n _, proto = await self._create_direct_connection(req, traces, timeout)\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 1206, in _create_direct_connection\n raise last_exc\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 1175, in _create_direct_connection\n transp, proto = await self._wrap_create_connection(\n File "C:\\Users\\SGM\\AppData\\Local\\Programs\\Python\\Python39\\lib\\site-packages\\aiohttp\\connector.py", line 992, in _wrap_create_connection\n raise client_error(req.connection_key, exc) from exc\naiohttp.client_exceptions.ClientConnectorError: Cannot connect to host example.com:80 ssl:default [The semaphore timeout period has expired]\nRun Code Online (Sandbox Code Playgroud)\n
aar*_*ron 15
首先,瓶颈是 TCP 连接器中同时连接的总数。
默认值aiohttp.TCPConnector是limit=100. 在大多数系统上(在 macOS 上测试),您应该能够通过传递connectorwith来将其加倍limit=200:
# async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=200)) as session:
Run Code Online (Sandbox Code Playgroud)
所花费的时间应该会显着减少。(在 macOS 上:q = 20_000从 58 秒减少到 33 秒,减少了 43%,q = 10_000从 31 秒减少到 18 秒,减少了 42%。)
您可以配置的数量limit取决于您的计算机可以打开的文件描述符的数量。(在 macOS 上:可以运行ulimit -n查看,将ulimit -n 1024当前终端会话增加到 1024,然后更改为limit=1000。相比limit=100,q = 20_000减少了 76% 至 14 秒,q = 10_000减少了 71% 至 9 秒。)
接下来,5000 万个请求看似挂起的原因仅仅是因为其数量庞大。
仅创建 1000 万个协程就post_tasks需要 68-98 秒(在我的机器上差异很大),然后事件循环进一步承受如此多的任务,其中 99.99% 被 TCP 连接池阻塞。
我们可以使用异步生成器推迟协程的创建:
async def make_async_gen(f, n, q):
async for x in make_numbers(n, q):
yield f(x)
Run Code Online (Sandbox Code Playgroud)
我们需要一个对应的对象来asyncio.as_completed()处理async_genand concurrency:
from asyncio import ensure_future, events
from asyncio.queues import Queue
def as_completed_for_async_gen(fs_async_gen, concurrency):
done = Queue()
loop = events.get_event_loop()
# todo = {ensure_future(f, loop=loop) for f in set(fs)} # -
todo = set() # +
def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
loop.create_task(_add_next()) # +
async def _wait_for_one():
f = await done.get()
return f.result()
async def _add_next(): # +
try:
f = await fs_async_gen.__anext__()
except StopAsyncIteration:
return
f = ensure_future(f, loop=loop)
f.add_done_callback(_on_completion)
todo.add(f)
# for f in todo: # -
# f.add_done_callback(_on_completion) # -
# for _ in range(len(todo)): # -
# yield _wait_for_one() # -
for _ in range(concurrency): # +
loop.run_until_complete(_add_next()) # +
while todo: # +
yield _wait_for_one() # +
Run Code Online (Sandbox Code Playgroud)
然后,我们更新fetch():
from functools import partial
CONCURRENCY = 200 # +
n = 0
q = 50_000_000
async def fetch():
# example
url = "https://httpbin.org/anything/log?id="
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=CONCURRENCY)) as session:
# post_tasks = [] # -
# # prepare the coroutines that post # -
# async for x in make_numbers(n, q): # -
# post_tasks.append(do_get(session, url, x)) # -
# Prepare the coroutines generator # +
async_gen = make_async_gen(partial(do_get, session, url), n, q) # +
# now execute them all at once # -
# responses = [await f for f in tqdm.asyncio.tqdm.as_completed(post_tasks, total=len(post_tasks))] # -
# Now execute them with a specified concurrency # +
responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)] # +
Run Code Online (Sandbox Code Playgroud)
通过以上内容,程序可以开始处理 5000 万个请求,但是:
CONCURRENCY = 1000根据 的估计,仍需要 8 小时左右tqdm。responses并崩溃。对于第 2 点,您可能应该这样做:
# responses = [await f for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q)]
for f in tqdm.tqdm(as_completed_for_async_gen(async_gen, CONCURRENCY), total=q):
response = await f
# Do something with response, such as writing to a local file
# ...
Run Code Online (Sandbox Code Playgroud)
do_get()应该return data:
async def do_get(session, url, x):
headers = {
'Content-Type': "application/x-www-form-urlencoded",
'Access-Control-Allow-Origin': "*",
'Accept-Encoding': "gzip, deflate",
'Accept-Language': "en-US"
}
async with session.get(url + str(x), headers=headers) as response:
data = await response.text()
# print(data) # -
return data # +
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9903 次 |
| 最近记录: |