Spu*_*nik 3 python web-scraping python-asyncio python-3.7
我目前正在设计一个蜘蛛来抓取特定的网站。我可以同步完成,但我正在尝试了解 asyncio 以使其尽可能高效。我尝试了很多不同的方法,yield
但chained functions
我queues
无法使其发挥作用。
我最感兴趣的是设计部分和解决问题的逻辑。不是必需的可运行代码,而是强调 assyncio 最重要的方面。我无法发布任何代码,因为我的尝试不值得分享。
使命:
example.com(我知道,应该是 example.com)有以下设计:
以同步方式,逻辑将是这样的:
for table in my_url_list:
# Get HTML
# Extract urls from HTML to user_list
for user in user_list:
# Get HTML
# Extract urls from HTML to user_subcat_list
for subcat in user_subcat_list:
# extract content
Run Code Online (Sandbox Code Playgroud)
但现在我想异步抓取网站。假设我们使用 5 个实例(pyppeteer 中的选项卡或 aiohttp 中的请求)来解析内容。我们应该如何设计它以使其最高效以及我们应该使用什么 asyncio 语法?
感谢@user4815162342解决了我的问题。我一直在研究他的解决方案,如果其他人想使用 asyncio,我会在下面发布可运行的代码。
import asyncio
import random
my_url_list = ['exemple.com/table1', 'exemple.com/table2', 'exemple.com/table3']
# Random sleeps to simulate requests to the server
async def randsleep(caller=None):
i = random.randint(1, 6)
if caller:
print(f"Request HTML for {caller} sleeping for {i} seconds.")
await asyncio.sleep(i)
async def process_urls(url_list):
print(f'async def process_urls: added {url_list}')
limit = asyncio.Semaphore(5)
coros = [process_user_list(table, limit) for table in url_list]
await asyncio.gather(*coros)
async def process_user_list(table, limit):
async with limit:
# Simulate HTML request and extracting urls to populate user_list
await randsleep(table)
if table[-1] == '1':
user_list = ['exemple.com/user1', 'exemple.com/user2', 'exemple.com/user3']
elif table[-1] == '2':
user_list = ['exemple.com/user4', 'exemple.com/user5', 'exemple.com/user6']
else:
user_list = ['exemple.com/user7', 'exemple.com/user8', 'exemple.com/user9']
print(f'async def process_user_list: Extracted {user_list} from {table}')
# Execute process_user in parallel, but do so outside the `async with`
# because process_user will also need the semaphore, and we don't need
# it any more since we're done with fetching HTML.
coros = [process_user(user, limit) for user in user_list]
await asyncio.gather(*coros)
async def process_user(user, limit):
async with limit:
# Simulate HTML request and extracting urls to populate user_subcat_list
await randsleep(user)
user_subcat_list = [user + '/profile', user + '/info', user + '/followers']
print(f'async def process_user: Extracted {user_subcat_list} from {user}')
coros = [process_subcat(subcat, limit) for subcat in user_subcat_list]
await asyncio.gather(*coros)
async def process_subcat(subcat, limit):
async with limit:
# Simulate HTML request and extracting content
await randsleep(subcat)
print(f'async def process_subcat: Extracted content from {subcat}')
if __name__ == '__main__':
asyncio.run(process_urls(my_url_list))
Run Code Online (Sandbox Code Playgroud)
让我们重构同步代码,以便每个可以访问网络的部分都位于单独的函数中。功能没有改变,但以后会让事情变得更容易:
def process_urls(url_list):
for table in url_list:
process_user_list(table)
def process_user_list(table):
# Get HTML, extract user_list
for user in user_list:
process_user(user)
def process_user(user):
# Get HTML, extract user_subcat_list
for subcat in user_subcat_list:
process_subcat(subcat)
def process_subcat(subcat):
# get HTML, extract content
if __name__ == '__main__':
process_urls(my_url_list)
Run Code Online (Sandbox Code Playgroud)
假设处理顺序并不重要,我们希望异步版本能够并行运行现在在for
循环中调用的所有函数。它们仍然会在单个线程上运行,但它们会await
阻止任何可能阻塞的事情,从而允许事件循环并行化等待,并通过在准备好继续时恢复每个协程来驱动它们完成。这是通过将每个协程生成为单独的任务来实现的,该任务独立于其他任务运行,因此是并行的。例如,顺序(但仍然异步)版本process_urls
将如下所示:
async def process_urls(url_list):
for table in url_list:
await process_user_list(table)
Run Code Online (Sandbox Code Playgroud)
这是异步的,因为它在事件循环内运行,并且您可以并行运行多个此类函数(我们将很快展示如何执行),但它也是顺序的,因为它选择await
每次调用process_user_list
. 在每次循环迭代时,await
显式指示 asyncio 暂停执行,process_urls
直到结果process_user_list
可用。
我们想要的是告诉 asyncio 并行运行所有调用process_user_list
,并暂停执行process_urls
直到它们全部完成。在“后台”生成协程的基本原语是使用来将其安排为任务asyncio.create_task
,这是最接近的轻量级线程的异步等效项。使用create_task
并行版本process_urls
将如下所示:
async def process_urls(url_list):
# spawn a task for each table
tasks = []
for table in url_list:
asyncio.create_task(process_user_list(table))
tasks.append(task)
# The tasks are now all spawned, so awaiting any one task lets
# them all run.
for task in tasks:
await task
Run Code Online (Sandbox Code Playgroud)
乍一看,第二个循环看起来像以前的版本一样按顺序等待任务,但事实并非如此。由于每个任务都await
挂起到事件循环,等待任何任务都允许所有任务继续进行,只要它们事先使用create_task()
. 总等待时间不会长于最长任务的时间,无论它们完成的顺序如何。
这种模式使用得非常频繁,因此 asyncio 有一个专用的实用函数,即asyncio.gather
。使用此函数,可以用更短的版本来表达相同的代码:
async def process_urls(url_list):
coros = [process_user_list(table) for table in url_list]
await asyncio.gather(*coros)
Run Code Online (Sandbox Code Playgroud)
但还有另一件事需要注意:因为process_user_list
将从服务器获取 HTML,并且会有许多并行运行的实例,我们不能允许它用数百个同时连接来攻击服务器。我们可以创建一个工作任务池和某种队列,但 asyncio 提供了一个更优雅的解决方案:信号量。信号量是一种同步设备,不允许并行激活超过预定数量,从而使其余激活排队等待。
最终版本process_urls
创建了一个信号量并将其传递下去。它不会激活信号量,因为process_urls
它本身并不实际获取任何 HTML,因此它没有理由在process_user_list
s 运行时保留信号量槽。
async def process_urls(url_list):
limit = asyncio.Semaphore(5)
coros = [process_user_list(table, limit) for table in url_list]
await asyncio.gather(*coros)
Run Code Online (Sandbox Code Playgroud)
process_user_list
看起来很相似,但它确实需要使用以下命令激活信号量async with
:
async def process_user_list(table, limit):
async with limit:
# Get HTML using aiohttp, extract user_list
# Execute process_user in parallel, but do so outside the `async with`
# because process_user will also need the semaphore, and we don't need
# it any more since we're done with fetching HTML.
coros = [process_user(user, limit) for user in user_list]
await asyncio.gather(*coros)
Run Code Online (Sandbox Code Playgroud)
process_user
并且process_subcat
更多相同:
async def process_user(user, limit):
async with limit:
# Get HTML, extract user_subcat_list
coros = [process_subcat(subcat, limit) for subcat in user_subcat_list]
await asyncio.gather(*coros)
def process_subcat(subcat, limit):
async with limit:
# get HTML, extract content
# do something with content
if __name__ == '__main__':
asyncio.run(process_urls(my_url_list))
Run Code Online (Sandbox Code Playgroud)
在实践中,您可能希望异步函数共享相同的 aiohttp 会话,因此您可能会在顶级函数中创建它(process_urls
在您的情况下)并将其与信号量一起传递。每个获取 HTML 的函数都会有另一个async with
用于 aiohttp 请求/响应,例如:
async with limit:
async with session.get(url, params...) as resp:
# get HTML data here
resp.raise_for_status()
resp = await resp.read()
# extract content from HTML data here
Run Code Online (Sandbox Code Playgroud)
这两个async with
s 可以合二为一,减少缩进但保持相同的含义:
async with limit, session.get(url, params...) as resp:
# get HTML data here
resp.raise_for_status()
resp = await resp.read()
# extract content from HTML data here
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1127 次 |
最近记录: |