dor*_*emi 46 python multithreading python-multithreading
假设我有一个非常大的列表,我正在执行这样的操作:
for item in items:
try:
api.my_operation(item)
except:
print 'error with item'
Run Code Online (Sandbox Code Playgroud)
我的问题有两个方面:
我想使用多线程一次启动一堆api.my_operations,这样我就可以同时处理5个或10个甚至100个项目.
如果my_operation()返回一个异常(因为我可能已经处理过该项) - 那没关系.它不会破坏任何东西.循环可以继续到下一个项目.
注意:这适用于Python 2.7.3
aba*_*ert 89
首先,在Python中,如果您的代码受CPU限制,多线程将无济于事,因为只有一个线程可以保存全局解释器锁,因此一次运行Python代码.所以,你需要使用进程,而不是线程.
如果您的操作"需要永远返回",则不是这样,因为它是IO限制的 - 即等待网络或磁盘副本等.我稍后再说.
接下来,一次处理5个或10个或100个项目的方法是创建一个5或10或100个工作池,并将这些项目放入工作人员服务的队列中.幸运的是,stdlib multiprocessing和concurrent.futures库都包含了大部分细节.
前者对传统节目更强大,更灵活; 如果你需要组成未来等待,后者会更简单; 对于琐碎的案例,你选择哪个并不重要.(在这种情况下,每个最明显的实现需要3行futures,4行multiprocessing.)
如果您使用的是2.6-2.7或3.0-3.1,futures则不是内置的,但您可以从PyPI(pip install futures)安装它.
最后,如果您可以将整个循环迭代转换为函数调用(例如,传递给函数调用map),那么并行化事情通常要简单得多,所以让我们先做:
def try_my_operation(item):
try:
api.my_operation(item)
except:
print('error with item')
Run Code Online (Sandbox Code Playgroud)
把它们放在一起:
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)
Run Code Online (Sandbox Code Playgroud)
如果你有很多相对较小的工作,多处理的开销可能会淹没收益.解决这个问题的方法是将工作批量化为更大的工作.例如(使用grouper从itertools食谱,您可以复制并粘贴到你的代码,或者从一开始more-itertoolsPyPI上的项目):
def try_multiple_operations(items):
for item in items:
try:
api.my_operation(item)
except:
print('error with item')
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group)
for group in grouper(5, items)]
concurrent.futures.wait(futures)
Run Code Online (Sandbox Code Playgroud)
最后,如果你的代码是IO绑定怎么办?然后线程和进程一样好,并且开销更少(限制更少,但在这种情况下,这些限制通常不会影响您).有时"减少开销"足以表示您不需要使用线程进行批处理,但是您使用进程,这是一个不错的胜利.
那么,你如何使用线程而不是进程呢?只需ProcessPoolExecutor改为ThreadPoolExecutor.
如果您不确定您的代码是CPU绑定的还是IO绑定的,那么只需尝试两种方式.
我可以在我的python脚本中为多个函数执行此操作吗?例如,如果我想要并行化的代码中的其他地方有另一个for循环.是否可以在同一个脚本中执行两个多线程函数?
是.实际上,有两种不同的方法可以做到这一点.
首先,您可以共享相同的(线程或进程)执行程序,并在多个位置使用它,没有任何问题.任务和未来的全部意义在于它们是独立的; 你不关心他们在哪里跑,只是你把他们排队并最终得到答案.
或者,您可以在同一程序中有两个执行程序,没有问题.这有一个性能成本 - 如果你同时使用两个执行程序,你最终会尝试在8个内核上运行(例如)16个忙线程,这意味着将会有一些上下文切换.但有时它值得做,因为,比如说,两个执行器很少同时繁忙,它使你的代码更简单.或者,一个执行程序正在运行可能需要一段时间才能完成的非常大的任务,另一个执行程序正在运行需要尽快完成的非常小的任务,因为响应性比部分程序的吞吐量更重要.
如果你不知道哪个适合你的程序,通常是第一个.
woo*_*ing 22
编辑2018-02-06:基于此评论的修订
编辑:忘了提到这适用于Python 2.7.x.
有multiprocesing.pool,以下示例说明了如何使用其中一个:
from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool
pool_size = 5 # your "parallelness"
# define worker function before a Pool is instantiated
def worker(item):
try:
api.my_operation(item)
except:
print('error with item')
pool = Pool(pool_size)
for item in items:
pool.apply_async(worker, (item,))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
现在,如果确实确定您的进程受到@abarnert提到的CPU限制,请将ThreadPool更改为进程池实现(在ThreadPool导入下注释).您可以在此处找到更多详细信息:http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
Rya*_*ing 10
您可以使用如下方法将处理拆分为指定数量的线程:
import threading
def process(items, start, end):
for item in items[start:end]:
try:
api.my_operation(item)
except Exception:
print('error with item')
def split_processing(items, num_splits=4):
split_size = len(items) // num_splits
threads = []
for i in range(num_splits):
# determine the indices of the list this thread will handle
start = i * split_size
# special case on the last chunk to account for uneven splits
end = None if i+1 == num_splits else (i+1) * split_size
# create the thread
threads.append(
threading.Thread(target=process, args=(items, start, end)))
threads[-1].start() # start the thread we just created
# wait for all threads to finish
for t in threads:
t.join()
split_processing(items)
Run Code Online (Sandbox Code Playgroud)
import numpy as np
import threading
def threaded_process(items_chunk):
""" Your main process which runs in thread for each chunk"""
for item in items_chunk:
try:
api.my_operation(item)
except Exception:
print('error with item')
n_threads = 20
# Splitting the items into chunks equal to number of threads
array_chunk = np.array_split(input_image_list, n_threads)
thread_list = []
for thr in range(n_threads):
thread = threading.Thread(target=threaded_process, args=(array_chunk[thr]),)
thread_list.append(thread)
thread_list[thr].start()
for thread in thread_list:
thread.join()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
68616 次 |
| 最近记录: |