Jos*_*des 2 python web-scraping python-asyncio python-requests-html
客观的:
我正在尝试同时抓取多个网址。我不想同时发出太多请求,因此我使用此解决方案来限制它。
问题:
正在为所有任务发出请求,而不是一次针对有限数量的任务。
精简代码:
async def download_all_product_information():
# TO LIMIT THE NUMBER OF CONCURRENT REQUESTS
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
# FUNCTION TO ACTUALLY DOWNLOAD INFO
async def get_product_information(url_to_append):
url = 'https://www.amazon.com.br' + url_to_append
print('Product Information - Page ' + str(current_page_number) + ' for category ' + str(
category_index) + '/' + str(len(all_categories)) + ' in ' + gender)
source = await get_source_code_or_content(url, should_render_javascript=True)
time.sleep(random.uniform(2, 5))
return source
# LOOP WHERE STUFF GETS DONE
for current_page_number in range(1, 401):
for gender in os.listdir(base_folder):
all_tasks = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_task = asyncio.create_task(get_product_information(product_specific_url))
all_tasks.append(current_task)
await gather_with_concurrency(random.randrange(8, 15), *all_tasks)
async def main():
await download_all_product_information()
# just to make sure there are not any problems caused by two event loops
if asyncio.get_event_loop().is_running(): # only patch if needed (i.e. running in Notebook, Spyder, etc)
import nest_asyncio
nest_asyncio.apply()
# for asynchronous functionality
if __name__ == '__main__':
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?谢谢!
错误的是这一行:
current_task = asyncio.create_task(get_product_information(product_specific_url))
Run Code Online (Sandbox Code Playgroud)
当您创建“任务”时,它会立即安排执行。一旦您的代码让步执行 asyncio 循环(在任何“await”表达式处),asyncio 将循环执行您的所有任务。
在您指出的原始片段中,信号量也保护任务本身的创建,确保一次只有“n”个任务处于活动状态。该片段中传递的内容gather_with_concurrency
是协同例程。
与任务不同,协同例程是准备等待但尚未调度的对象。它们可以像任何其他对象一样免费传递 - 它们仅在等待或被任务包装时才会执行(然后当代码将控制传递给 asyncio 循环时)。
在您的代码中,您将通过调用创建协同例程get_product_information
,并立即将其包装在任务中。在await
调用自身的那行指令中gather_with_concurrency
,它们都是同时运行的。
修复方法很简单:此时不要创建任务,只需在信号量保护的代码内创建任务即可。仅将原始协同例程添加到您的列表中:
...
all_coroutines = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_coroutine = get_product_information(product_specific_url)
all_coroutines.append(current_coroutine)
await gather_with_concurrency(random.randrange(8, 15), *all_coroutines)
Run Code Online (Sandbox Code Playgroud)
这段代码中仍然存在一个不相关的错误,会导致并发失败:您正在对time.sleep
inside进行同步调用gather_product_information
。这将使 asyncio 循环停止在此时,直到睡眠结束。正确的做法是使用await asyncio.sleep(...)
.
归档时间: |
|
查看次数: |
2838 次 |
最近记录: |