Python/BeautifulSoup抓取中的多线程技术根本没有加速

Kub*_*888 8 parallel-processing multithreading beautifulsoup web-scraping python-2.7

我有一个csv文件("SomeSiteValidURLs.csv"),它列出了我需要抓取的所有链接.代码正在运行,将通过csv中的url,抓取信息并记录/保存在另一个csv文件("Output.csv")中.但是,由于我计划在网站的大部分区域(大于10,000,000页)进行此操作,因此速度非常重要.对于每个链接,爬行并将信息保存到csv大约需要1秒,这对于项目的大小来说太慢了.所以我已经整合了多线程模块,令我惊讶的是它根本没有加速,它仍然需要1个人链接.我做错什么了吗?还有其他方法可以加快处理速度吗?

没有多线程:

import urllib2
import csv
from bs4 import BeautifulSoup
import threading

def crawlToCSV(FileName):

    with open(FileName, "rb") as f:
        for URLrecords in f:

            OpenSomeSiteURL = urllib2.urlopen(URLrecords)
            Soup_SomeSite = BeautifulSoup(OpenSomeSiteURL, "lxml")
            OpenSomeSiteURL.close()

            tbodyTags = Soup_SomeSite.find("tbody")
            trTags = tbodyTags.find_all("tr", class_="result-item ")

            placeHolder = []

            for trTag in trTags:
                tdTags = trTag.find("td", class_="result-value")
                tdTags_string = tdTags.string
                placeHolder.append(tdTags_string)

            with open("Output.csv", "ab") as f:
                writeFile = csv.writer(f)
                writeFile.writerow(placeHolder)

crawltoCSV("SomeSiteValidURLs.csv")
Run Code Online (Sandbox Code Playgroud)

使用多线程:

import urllib2
import csv
from bs4 import BeautifulSoup
import threading

def crawlToCSV(FileName):

    with open(FileName, "rb") as f:
        for URLrecords in f:

            OpenSomeSiteURL = urllib2.urlopen(URLrecords)
            Soup_SomeSite = BeautifulSoup(OpenSomeSiteURL, "lxml")
            OpenSomeSiteURL.close()

            tbodyTags = Soup_SomeSite.find("tbody")
            trTags = tbodyTags.find_all("tr", class_="result-item ")

            placeHolder = []

            for trTag in trTags:
                tdTags = trTag.find("td", class_="result-value")
                tdTags_string = tdTags.string
                placeHolder.append(tdTags_string)

            with open("Output.csv", "ab") as f:
                writeFile = csv.writer(f)
                writeFile.writerow(placeHolder)

fileName = "SomeSiteValidURLs.csv"

if __name__ == "__main__":
    t = threading.Thread(target=crawlToCSV, args=(fileName, ))
    t.start()
    t.join()
Run Code Online (Sandbox Code Playgroud)

dan*_*ano 12

你没有正确地并行化这个.你真正想要做的是让你的for循环中的工作在许多工人中同时发生.现在你将所有的工作都转移到一个后台线程中,它同步完成整个事情.这根本不会改善性能(实际上只会略微伤害它).

这是一个使用ThreadPool来并行化网络操作和解析的示例.尝试一次跨多个线程写入csv文件是不安全的,所以我们将返回已经写回父级的数据,并让父级将所有结果写入文件末尾.

import urllib2
import csv
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool  # This is a thread-based Pool
from multiprocessing import cpu_count

def crawlToCSV(URLrecord):
    OpenSomeSiteURL = urllib2.urlopen(URLrecord)
    Soup_SomeSite = BeautifulSoup(OpenSomeSiteURL, "lxml")
    OpenSomeSiteURL.close()

    tbodyTags = Soup_SomeSite.find("tbody")
    trTags = tbodyTags.find_all("tr", class_="result-item ")

    placeHolder = []

    for trTag in trTags:
        tdTags = trTag.find("td", class_="result-value")
        tdTags_string = tdTags.string
        placeHolder.append(tdTags_string)

    return placeHolder


if __name__ == "__main__":
    fileName = "SomeSiteValidURLs.csv"
    pool = Pool(cpu_count() * 2)  # Creates a Pool with cpu_count * 2 threads.
    with open(fileName, "rb") as f:
        results = pool.map(crawlToCSV, f)  # results is a list of all the placeHolder lists returned from each call to crawlToCSV
    with open("Output.csv", "ab") as f:
        writeFile = csv.writer(f)
        for result in results:
            writeFile.writerow(result)
Run Code Online (Sandbox Code Playgroud)

请注意,在Python中,线程实际上只会加速I/O操作 - 由于GIL,CPU绑定操作(如解析/搜索BeautifulSoup正在进行)实际上无法通过线程并行完成,因为只有一个线程可以执行一次基于CPU的操作.因此,您仍然可能看不到您希望采用这种方法的速度.当您需要在Python中加速CPU绑定操作时,您需要使用多个进程而不是线程.幸运的是,您可以轻松地看到此脚本如何使用多个进程而不是多个线程执行; 只是换from multiprocessing.dummy import Poolfrom multiprocessing import Pool.无需其他更改.

编辑:

如果你需要将它扩展到10,000,000行的文件,你需要稍微调整一下这个代码 - Pool.map将你传递给它的迭代转换成一个列表,然后再发送给你的工人,这显然不是以10,000,000个参赛名单开展工作; 在记忆中拥有这一切可能会让你的系统陷入困境.将所有结果存储在列表中的问题相同.相反,你应该使用Pool.imap:

imap(func,iterable [,chunksize])

一个更加懒惰的map()版本.

chunksize参数与map()方法使用的参数相同.对于非常长的迭代,使用较大的chunksize值可以使作业比使用默认值1更快地完成.

if __name__ == "__main__":
    fileName = "SomeSiteValidURLs.csv"
    FILE_LINES = 10000000
    NUM_WORKERS = cpu_count() * 2
    chunksize = FILE_LINES // NUM_WORKERS * 4   # Try to get a good chunksize. You're probably going to have to tweak this, though. Try smaller and lower values and see how performance changes.
    pool = Pool(NUM_WORKERS)

    with open(fileName, "rb") as f:
        result_iter = pool.imap(crawlToCSV, f)
    with open("Output.csv", "ab") as f:
        writeFile = csv.writer(f)
        for result in result_iter:  # lazily iterate over results.
            writeFile.writerow(result)
Run Code Online (Sandbox Code Playgroud)

有了imap,我们永远不会将所有内容全部f存入内存,也不会立即将所有结果存储在内存中.我们在记忆中所拥有的最多的是chunksize线条f,应该更易于管理.

  • @ KubiK888是的,`cpu_count()*2`确定并发运行的线程数.增加数量并不会提高性能,因为添加更多线程的回报会减少 - 添加更多线程以加快I/O是有帮助的,但它只会减慢线程正在执行的所有其他操作,因为只有一个可以运行一次.当你有很多线程一次运行一个时,操作系统必须进行大量的上下文切换,以便为每个线程提供CPU时间,最终比使用更少的线程更慢.您甚至可能会发现只使用`cpu_count`会更快. (3认同)