多线程Python应用程序和套接字连接的问题

Vac*_*ube 8 python sockets multithreading

我正在研究在带有4G内存的Ubuntu机器上运行的Python应用程序的问题.该工具将用于审核服务器(我们更喜欢使用自己的工具).它使用线程连接到许多服务器,并且许多TCP连接失败.但是,如果我在开始每个线程之间添加1秒的延迟,那么大多数连接都会成功.我用这个简单的脚本来调查可能发生的事情:

#!/usr/bin/python

import sys
import socket
import threading
import time

class Scanner(threading.Thread):
    def __init__(self, host, port):
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.status = ""

    def run(self):
        self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sk.settimeout(20)
        try:
            self.sk.connect((self.host, self.port))
        except Exception, err:
            self.status = str(err)
        else:
            self.status = "connected"
        finally:
            self.sk.close()


def get_hostnames_list(filename):
    return open(filename).read().splitlines()

if (__name__ == "__main__"):
    hostnames_file = sys.argv[1]
    hosts_list = get_hostnames_list(hostnames_file)
    threads = []
    for host in hosts_list:
        #time.sleep(1)
        thread = Scanner(host, 443)
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()
        print "Host: ", thread.host, " : ", thread.status
Run Code Online (Sandbox Code Playgroud)

如果我在time.sleep(1)注释掉的情况下执行此操作,比方说,300个主机中的许多连接都会因超时错误而失败,而如果我将延迟时间设置为1秒,它们就不会超时.我确实试过了在另一台运行在功能更强大的机器上的Linux发行版上的应用程序并没有那么多的连接错误?是由于内核限制吗?有没有什么可以让连接工作而不会延迟?

UPDATE

我还尝试了一个限制池中可用线程数的程序.通过将其减少到20,我可以使所有连接工作,但它只检查大约1个主机每秒.所以无论我尝试什么(放入睡眠(1)或限制并发线程的数量),我似乎无法每秒检查多于1个主机.

UPDATE

我刚刚发现这个问题与我所看到的相似.

UPDATE

我想知道用扭曲写这个可能会有帮助吗?任何人都可以展示我的例子看起来像使用扭曲写的?

jfs*_*jfs 5

你可以尝试gevent:

from gevent.pool import Pool    
from gevent import monkey; monkey.patch_all() # patches stdlib    
import sys
import logging    
from httplib import HTTPSConnection
from timeit import default_timer as timer    
info = logging.getLogger().info

def connect(hostname):
    info("connecting %s", hostname)
    h = HTTPSConnection(hostname, timeout=2)
    try: h.connect()
    except IOError, e:
        info("error %s reason: %s", hostname, e)
    else:
        info("done %s", hostname)
    finally:
        h.close()

def main():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")    
    info("getting hostname list")
    hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
    hosts_list = open(hosts_file).read().splitlines()    
    info("spawning jobs")
    pool = Pool(20) # limit number of concurrent connections
    start = timer()
    for _ in pool.imap(connect, hosts_list):
        pass
    info("%d hosts took us %.2g seconds", len(hosts_list), timer() - start)

if __name__=="__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

它每秒可以处理多个主机.

产量

2011-01-31 11:08:29,052 getting hostname list
2011-01-31 11:08:29,052 spawning jobs
2011-01-31 11:08:29,053 connecting www.yahoo.com
2011-01-31 11:08:29,053 connecting www.abc.com
2011-01-31 11:08:29,053 connecting www.google.com
2011-01-31 11:08:29,053 connecting stackoverflow.com
2011-01-31 11:08:29,053 connecting facebook.com
2011-01-31 11:08:29,054 connecting youtube.com
2011-01-31 11:08:29,054 connecting live.com
2011-01-31 11:08:29,054 connecting baidu.com
2011-01-31 11:08:29,054 connecting wikipedia.org
2011-01-31 11:08:29,054 connecting blogspot.com
2011-01-31 11:08:29,054 connecting qq.com
2011-01-31 11:08:29,055 connecting twitter.com
2011-01-31 11:08:29,055 connecting msn.com
2011-01-31 11:08:29,055 connecting yahoo.co.jp
2011-01-31 11:08:29,055 connecting taobao.com
2011-01-31 11:08:29,055 connecting google.co.in
2011-01-31 11:08:29,056 connecting sina.com.cn
2011-01-31 11:08:29,056 connecting amazon.com
2011-01-31 11:08:29,056 connecting google.de
2011-01-31 11:08:29,056 connecting google.com.hk
2011-01-31 11:08:29,188 done www.google.com
2011-01-31 11:08:29,189 done google.com.hk
2011-01-31 11:08:29,224 error wikipedia.org reason: [Errno 111] Connection refused
2011-01-31 11:08:29,225 done google.co.in
2011-01-31 11:08:29,227 error msn.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,228 error live.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,250 done google.de
2011-01-31 11:08:29,262 done blogspot.com
2011-01-31 11:08:29,271 error www.abc.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,465 done amazon.com
2011-01-31 11:08:29,467 error sina.com.cn reason: [Errno 111] Connection refused
2011-01-31 11:08:29,496 done www.yahoo.com
2011-01-31 11:08:29,521 done stackoverflow.com
2011-01-31 11:08:29,606 done youtube.com
2011-01-31 11:08:29,939 done twitter.com
2011-01-31 11:08:33,056 error qq.com reason: timed out
2011-01-31 11:08:33,057 error taobao.com reason: timed out
2011-01-31 11:08:33,057 error yahoo.co.jp reason: timed out
2011-01-31 11:08:34,466 done facebook.com
2011-01-31 11:08:35,056 error baidu.com reason: timed out
2011-01-31 11:08:35,057 20 hosts took us 6 seconds
Run Code Online (Sandbox Code Playgroud)


orl*_*rlp 3

真正的线程池怎么样?

#!/usr/bin/env python3

# http://code.activestate.com/recipes/577187-python-thread-pool/

from queue import Queue
from threading import Thread

class Worker(Thread):
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try: func(*args, **kargs)
            except Exception as exception: print(exception)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        self.tasks.join()
Run Code Online (Sandbox Code Playgroud)

例子:

import threadpool
pool = threadpool.ThreadPool(20) # 20 threads
pool.add_task(print, "test")
pool.wait_completion()
Run Code Online (Sandbox Code Playgroud)

它使用 python 3,但转换到 2.x 应该不会太难。如果这解决了您的问题,我并不感到惊讶。