如何在Python中关闭线程?

use*_*458 3 python queue multithreading join

我有一些问题,太多线程尚未完成.我认为队列命令.join()只是关闭队列而不是使用它的线程.

在我的脚本中,我需要检查280k域,并为每个域获取他的MX记录列表,并获得服务器的IPv6地址(如果有).

我使用了线程,并感谢他们脚本的速度快了很多倍.但是有一个问题,虽然有加入()队列,活着的线程数量在不断增长,直到错误发生的通知着创建任何新的线程(OS的限制吗?).

当我从数据库中检索新域时,如何在每个For循环后终止/关闭/停止/重置线程?

线程类定义......

class MX_getAAAA_thread(threading.Thread):
    def __init__(self,queue,id_domain):
        threading.Thread.__init__(self)
        self.queue = queue
        self.id_domain = id_domain


    def run(self):
        while True:
            self.mx = self.queue.get()

            res = dns.resolver.Resolver()
            res.lifetime = 1.5
            res.timeout = 0.5

            try:
                answers = res.query(self.mx,'AAAA')
                ip_mx = str(answers[0])
            except:
                ip_mx = "N/A"

            lock.acquire()

            sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()

            print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

            lock.release()
            self.queue.task_done()
Run Code Online (Sandbox Code Playgroud)

正在使用的线程类...(主要的For循环不在这里,这只是他身体的一部分)

try:
    answers = resolver.query(domain, 'MX')

    qMX = Queue.Queue()
    for i in range(len(answers)):
        t = MX_getAAAA_thread(qMX,id_domain)
        t.setDaemon(True)
        threads.append(t)
        t.start()

    for mx in answers:
        qMX.put(mx.exchange)

    qMX.join()

except NoAnswer as e:
    print "MX - Error: No Answer"
except Timeout as etime:
    print "MX - Error: dns.exception.Timeout"

print "end of script"
Run Code Online (Sandbox Code Playgroud)

我试过了:

for thread in threads:
            thread.join()
Run Code Online (Sandbox Code Playgroud)

队列已完成后,但的Thread.join()从未停止等待,尽管事实,有没有必要等待,因为queue.join()时执行没有什么线程做.

pok*_*oke 5

当我的线程涉及这样的无限循环时,我经常做的是将条件改为我可以从外部控制的东西.例如这样:

def run(self):
    self.keepRunning = True
    while self.keepRunning:
        # do stuff
Run Code Online (Sandbox Code Playgroud)

这样,我可以keepRunning从外部更改属性并将其设置为false,以便在下次检查循环条件时正常终止该线程.

顺便说一句.因为你似乎为你放入队列的每个项目只生成一个线程,你甚至根本不需要让线程循环,尽管我认为你应该总是强制执行可以创建的线程的最大限制这样(即for i in range(min(len(answers), MAX_THREAD_COUNT)):)

替代

在您的情况下,您可以只重用线程,而不是在每个for循环迭代中终止线程.从我从线程的源代码中收集到的东西,所有使迭代独有的线程的是id_domain你在创建时设置的属性.但是,您可以只提供队列,因此线程完全独​​立,您可以重用它们.

这看起来像这样:

qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
    t = MX_getAAAA_thread(qMX)
    t.daemon = True
    threads.append(t)
    t.start()

for id_domain in enumerateIdDomains():
    answers = resolver.query(id_domain, 'MX')
    for mx in answers:
        qMX.put((id_domain, mx.exchange)) # insert a tuple

qMX.join()

for thread in threads:
    thread.keepRunning = False
Run Code Online (Sandbox Code Playgroud)

当然,您需要稍微更改一下您的线程:

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        self.keepRunning = True
        while self.keepRunning:
            id_domain, mx = self.queue.get()
            # do stuff
Run Code Online (Sandbox Code Playgroud)