mya*_*cci 2 python parsing multithreading
我有一个python程序,可以顺序解析30,000多个文件.
有没有办法可以将其分解为多个线程(这是正确的术语吗?)并同时解析该文件的块.假设有30个算法,每个算法解析1000个文件.
这很简单.
您可以显式创建30个线程,并为每个线程分配1000个文件名.
但是,更简单的是,您可以创建一个包含30个线程的池,并让它们为具有30000个文件名的线程提供服务.这为您提供了自动负载平衡 - 如果某些文件比其他文件大得多,那么当另一个文件只完成10%时,您将无法完成一个线程.
该concurrent.futures模块为您提供了一种并行执行任务的好方法(包括将参数传递给任务并接收结果,甚至可以根据需要执行异常).如果您使用的是Python 2.x或3.1,则需要安装backport futures.然后你就这样做:
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
results = executor.map(parse_file, filenames)
Run Code Online (Sandbox Code Playgroud)
现在,30名工人可能太多了.你将压倒硬盘驱动器及其驱动程序,最终让大多数线程等待磁盘寻找.但少数可能值得做.调整max_workers和测试时间并查看系统最佳位置的位置非常容易.
如果你的代码比I/O工作做更多的CPU工作 - 也就是说,它花费更多的时间来解析字符串并构建复杂的结构等而不是从磁盘读取 - 那么线程将无济于事,至少在CPython中,因为Global Interpreter Lock.但您可以通过使用流程来解决这个问题.
从代码的角度来看,这是微不足道的:只需ThreadPoolExecutor改为ProcessPoolExecutor.
但是,如果您要返回大型或复杂的数据结构,那么在整个流程边界上序列化它们所花费的时间可能会吞噬甚至压倒您的节省.如果是这种情况,您有时可以通过批量处理更大的工作来改进:
def parse_files(filenames):
return [parse_file(filename) for filename in filenames]
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
results = executor.map(parse_files, grouper(10, filenames))
Run Code Online (Sandbox Code Playgroud)
但有时您可能需要降低到较低级别并使用multiprocessing具有进程间内存共享等功能的模块.
如果您不能/不想使用futures,则2.6+ multiprocessing.Pool用于普通处理器池,并且在名称下具有相同接口的线程池multiprocessing.ThreadPool(未记录)或multiprocessing.dummy.Pool(记录但丑陋).
在这样一个简单的案例中,普通池和执行器之间确实没有区别.而且,如上所述,在非常复杂的情况下,multiprocessing让你深入了解.在中间,futures往往更简单.但值得学习两者.