Asyncio,由于哨兵问题,任务没有正确完成

use*_*631 5 python-3.x async-await python-asyncio

我正在尝试使用预定义数量的工作人员进行一些网络抓取,作为学习。

我使用Noneas 作为哨兵来打破 while 循环并停止工作人员。

每个worker的速度各不相同,在最后一个url传递gather_search_links到获取链接之前,所有worker都被关闭。

我尝试使用asyncio.Queue,但与 deque 相比,我的控制力更小。

async def gather_search_links(html_sources, detail_urls):
    while True:
        if not html_sources:
            await asyncio.sleep(0)
            continue

        data = html_sources.pop()
        if data is None:
            html_sources.appendleft(None)
            break
        data = BeautifulSoup(data, "html.parser")
        result = data.find_all("div", {"data-component": "search-result"})
        for record in result:
            atag = record.h2.a
            url = f'{domain_url}{atag.get("href")}'
            detail_urls.appendleft(url)
        print("apended data", len(detail_urls))
        await asyncio.sleep(0)


async def get_page_source(urls, html_sources):
    client = httpx.AsyncClient()
    while True:
        if not urls:
            await asyncio.sleep(0)
            continue

        url = urls.pop()
        print("url", url)
        if url is None:
            urls.appendleft(None)
            break

        response = await client.get(url)
        html_sources.appendleft(response.text)
        await asyncio.sleep(8)
    html_sources.appendleft(None)


async def navigate(urls):
    for i in range(2, 7):
        url = f"https://www.example.com/?page={i}"
        urls.appendleft(url)
        await asyncio.sleep(0)
    nav_urls.appendleft(None)


loop = asyncio.get_event_loop()
nav_html = deque()
nav_urls = deque()
products_url = deque()

navigate_workers = [asyncio.ensure_future(navigate(nav_urls)) for _ in range(1)]
page_source_workers = [asyncio.ensure_future(get_page_source(nav_urls, nav_html)) for _ in range(2)]
product_urls_workers = [asyncio.ensure_future(gather_search_links(nav_html, products_url)) for _ in range(1)]
workers = asyncio.wait([*navigate_workers, *page_source_workers, *product_urls_workers])

loop.run_until_complete(workers)
Run Code Online (Sandbox Code Playgroud)

The*_*unk 2

我是个新手,所以这可能是错误的,但我相信问题在于所有三个函数:navigate()、gather_search_links() 和 get_page_source() 都是异步任务,可以以任何顺序完成。但是,您检查空双端队列并使用appendleft来确保 None 是双端队列中最左边的项目,看起来它们会适当地防止这种情况。出于所有意图和目的,代码看起来应该正确运行。

我认为问题出现在这一行:

workers = asyncio.wait([*navigate_workers, *page_source_workers, *product_urls_workers])
Run Code Online (Sandbox Code Playgroud)

根据这篇文章, asyncio.wait 函数不会根据上面编写的顺序对这些任务进行排序,而是根据 IO 作为协程来触发它们。同样,您在 Gather_search_links 和 get_page_source 开头的检查确保一个函数在另一个函数之后运行,因此如果每个函数只有一个工作人员,则此代码应该可以工作。如果每个函数有多个工作人员,我可以看到出现的问题,即 None 最终不会成为双端队列中最左边的项目。也许每个函数末尾的打印语句显示双端队列的内容对于解决此问题很有用。

我想我的主要问题是,如果您要编写额外的代码,为什么要异步执行这些任务,因为这些步骤必须同步完成?为了获取 HTML,您必须首先拥有 URL。为了抓取 HTML,您必须首先拥有 HTML。asyncio 在这里提供什么好处?对我来说,所有这三个任务作为同步任务更有意义。获取 URL、获取 HTML、抓取 HTML,并按此顺序进行。

编辑:我想到这里异步代码的主要好处是,当您从每个 URL 获取 HTML 时,您不必等待每个 URL 同步响应。在这种情况下我会做的是同步收集我的 URL首先同步收集 URL ,然后将 get 和 scrape 函数组合成一个异步函数,这将是您唯一的异步函数。然后,您不需要哨兵或检查“None”值或任何额外代码,并且您可以获得异步获取的完整值。然后,您可以将抓取的数据存储在 future 列表(或双端队列或其他)中。这将简化您的代码并为您提供尽可能最快的抓取时间。

最后编辑:这是我快速而肮脏的重写。我喜欢你的代码,所以我决定自己做。我不知道它是否有效,我不是Python人。

import asyncio
from collections import deque

import httpx as httpx
from bs4 import BeautifulSoup

# Get or build URLs from config
def navigate():
    urls = deque()
    for i in range(2, 7):
        url = f"https://www.example.com/?page={i}"
        urls.appendleft(url)
    return urls

# Asynchronously fetch and parse data for a single URL
async def fetchHTMLandParse(url):

    client = httpx.AsyncClient()
    response = await client.get(url)
    data = BeautifulSoup(response.text, "html.parser")
    result = data.find_all("div", {"data-component": "search-result"})
    for record in result:
        atag = record.h2.a
        #Domain URL was defined elsewhere
        url = f'{domain_url}{atag.get("href")}'
        products_urls.appendleft(url)


loop = asyncio.get_event_loop()
products_urls = deque()

nav_urls = navigate()
fetch_and_parse_workers = [asyncio.ensure_future(fetchHTMLandParse(url)) for url in nav_urls]
workers = asyncio.wait([*fetch_and_parse_workers])

loop.run_until_complete(workers)
Run Code Online (Sandbox Code Playgroud)