Ell*_*ter 7 python multiprocessing python-multiprocessing
我知道这听起来像是以前被问过的问题,但是等等,我会解释为什么其他选项不起作用。
我目前正在使用multiprocessing.Pool在应用程序中实现并行性,并希望扩展它以能够利用嵌套并行性。Pool仅将对象作为参数传递的天真的方法apply_async不起作用,如其他答案中所述,因为Pool无法进行腌制。
这是我的要求:
我需要某种池来限制并发执行任务的数量。例如,multiprocess.Pool用于此目的,但它不能传递给其他进程。
我需要嵌套并行性。在我的应用程序中,我需要执行 I/O 来识别嵌套工作是什么,所以我绝对不想从单个线程执行此操作。我认为这排除了这个问题的所有答案。
它需要在标准库中;我无法添加依赖项。这就排除了这个答案。
我真的很希望它能够与 Python 2 和 3 一起使用。但是,如果可以证明迁移到 Python 3 可以解决我的问题,我会考虑它。
我不需要专门使用多个进程,使用线程就可以了,因为大部分工作是 I/O 或等待子进程完成。
我尝试过使用multiprocessing.dummy,它是相同的接口,但在threading. 但是,当我尝试调用get()以检索测试结果时,我收到以下错误,所以我认为这已经结束了。
File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
ValueError: signal only works in main thread
Run Code Online (Sandbox Code Playgroud)
我知道concurrent.futuresPython 3 中的库,但这似乎有一些严重的限制。例如,本节中的第二个示例在我的例子中似乎是一个阻碍:
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor
我不明白如何使用基本上任何直接编写的嵌套并行算法来避免遇到这个问题。因此,即使我愿意使用 Python 3,我也认为这是不可能的。
如果没有编写我自己的实现,我不知道标准库中是否有任何其他可用选项。
您似乎已经排除了这种可能性,但我怀疑 https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor或https://docs.python.org/3/library/concurrent。 futures.html#processpoolexecutor如果您能够迁移到 Python 3,或者添加 Python 2 的依赖项,
如果在处理该文件之前不必触发每个文件的额外工作,则可以使用一个协调线程来触发所有其他线程,这样就可以防止死锁,如下例所示。
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(max_workers=3)
def find_work_inputs(dummy_file):
print("{}: Finding work...".format(dummy_file))
time.sleep(1)
work = range(0, dummy_file)
print("{}: Work is {}".format(dummy_file, work))
return work
def do_work(dummy_file, work_input):
print("{}: {}".format(dummy_file, work_input))
print("{}: Doing work {}...".format(dummy_file, work_input))
time.sleep(1)
return work_input * work_input
dummy_files = [1,2,3,4,5]
futures = []
for dummy_file in dummy_files:
work_inputs = pool.submit(find_work_inputs, dummy_file)
for work_input in work_inputs.result():
result = work_input
futures.append((dummy_file, result, pool.submit(do_work, dummy_file, result)))
for dummy_file, work_input, future in futures:
print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))
Run Code Online (Sandbox Code Playgroud)
或者,如果第一级的每个线程都需要自己触发工作,则额外的工作可能需要位于另一个池中以防止死锁(取决于result()每个 future 的调用时间),如下所示。
from concurrent.futures import ThreadPoolExecutor
import time
find_work_pool = ThreadPoolExecutor(max_workers=3)
do_work_pool = ThreadPoolExecutor(max_workers=3)
def find_work_inputs(dummy_file):
print("{}: Finding work...".format(dummy_file))
time.sleep(1)
work = range(0, dummy_file)
print("{}: Work is {}".format(dummy_file, work))
futures = []
for work_input in work:
futures.append((dummy_file, work_input, do_work_pool.submit(do_work, dummy_file, work_input)))
return futures
def do_work(dummy_file, work_input):
print("{}: {}".format(dummy_file, work_input))
print("{}: Doing work {}...".format(dummy_file, work_input))
time.sleep(1)
return work_input * work_input
dummy_files = [1,2,3,4,5]
futures = []
for dummy_file in dummy_files:
futures.extend(find_work_pool.submit(find_work_inputs, dummy_file).result())
for dummy_file, work_input, future in futures:
print("Result from file:{} input:{} is {}".format(dummy_file, work_input, future.result()))
Run Code Online (Sandbox Code Playgroud)