Python多线程搜寻器

tor*_*eff 5 python multithreading web-crawler thread-safety

你好!我正在尝试使用python编写网络爬虫。我想使用python多线程。即使阅读了较早的建议论文和教程,我仍然有问题。我的代码在这里(整个源代码在这里):

class Crawler(threading.Thread):

    global g_URLsDict 
    varLock = threading.Lock()
    count = 0

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.url = self.queue.get()

    def run(self):
        while 1:
            print self.getName()+" started" 
            self.page = getPage(self.url)
            self.parsedPage = getParsedPage(self.page, fix=True)
            self.urls = getLinksFromParsedPage(self.parsedPage)

            for url in self.urls:

                self.fp = hashlib.sha1(url).hexdigest()

                #url-seen check
                Crawler.varLock.acquire() #lock for global variable g_URLs
                if self.fp in g_URLsDict:
                    Crawler.varLock.release() #releasing lock
                else:
                    #print url+" does not exist"
                    Crawler.count +=1
                    print "total links: %d"%len(g_URLsDict)
                    print self.fp
                    g_URLsDict[self.fp] = url
                    Crawler.varLock.release() #releasing lock
                    self.queue.put(url)

                    print self.getName()+ " %d"%self.queue.qsize()
                    self.queue.task_done()
            #self.queue.task_done()
        #self.queue.task_done()


print g_URLsDict
queue = Queue.Queue()
queue.put("http://www.ertir.com")

for i in range(5):
    t = Crawler(queue)
    t.setDaemon(True)
    t.start()

queue.join()
Run Code Online (Sandbox Code Playgroud)

它无法按需运行,在线程1之后不会产生任何结果,并且在某些时间执行不同的操作会出现此错误:

Exception in thread Thread-2 (most likely raised during interpreter shutdown):
Run Code Online (Sandbox Code Playgroud)

我该如何解决?而且我也不认为这比for循环更有效。

我试图修复run():

def run(self):
    while 1:
        print self.getName()+" started" 
        self.page = getPage(self.url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()
                #self.queue.task_done()
        #self.queue.task_done()
    self.queue.task_done()
Run Code Online (Sandbox Code Playgroud)

我在不同的地方尝试了task_done()命令,有人可以解释其中的区别吗?

Jon*_*age 4

self.url = self.queue.get()您仅在线程初始化时调用。如果您想获取新的 url 以便进一步处理,则需要尝试在 while 循环内从队列中重新获取 url。

尝试替换self.page = getPage(self.url)self.page = getPage(self.queue.get()). 请注意,get 函数将无限期地阻塞。您可能希望在一段时间后超时,并添加某种方式让后台线程根据请求正常退出(这将消除您看到的异常)。

effbot.org 上有一些很好的示例,它们按照我上面描述的方式使用 get() 。

编辑- 对您最初评论的答复:

查看文档task_done();对于每次调用get()(不会超时),您应该调用task_done()它告诉任何阻塞调用join(),该队列上的所有内容现在都已处理。每次调用get()都会在等待新 URL 发布到队列时阻塞(睡眠)。

Edit2 - 尝试这个替代运行功能:

def run(self):
    while 1:
        print self.getName()+" started"
        url = self.queue.get() # <-- note that we're blocking here to wait for a url from the queue
        self.page = getPage(url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                Crawler.count +=1
                print "total links: %d"%len(g_URLsDict)
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()

        self.queue.task_done() # <-- We've processed the url this thread pulled off the queue so indicate we're done with it.
Run Code Online (Sandbox Code Playgroud)