coa*_*tum 0 python multiprocessing python-multiprocessing
我有一个函数和一个工作列表:
jobs = [[(2, 'dog'), None],
[(-1, 'cat'), (0,)],
[(-1, 'Bob'), (1,)],
[(7, 'Alice'), None],
[(0, 'spam'), (2,3)]]
Run Code Online (Sandbox Code Playgroud)
我想并行地将函数应用于参数(第一个元组),同时满足对先前作业(第二个元组)的依赖性。例如,在狗的工作完成之前,猫的工作不能开始。但是,我不想占用一个核心并等待作业的依赖项完成。相反,我想继续执行可以立即执行的不同作业,以便如果可能的话,所有核心始终保持忙碌。有小费吗?非常感谢!
Charchit Agarwal 发表的评论可能是一种方式。问题是,如果一项作业具有多个依赖关系,并且这些依赖关系是在不同的进程中完成的,那么这些“超级”功能如何相互通信?因此,这是另一种方法,它使用作业完成回调在作业依赖项完成时提交新作业:
我将首先处理您的jobs列表以创建以下实例:
starts_immediately:可以立即提交的作业编号列表(即 的索引jobs),因为它们没有依赖性。depends_on:集合字典。键是作业编号,其值是提交该作业之前必须完成的一组作业。precedes:集合字典。键是一个作业编号,其值是一组作业编号,在该作业完成之前无法启动这些作业编号。然后,我们安排每当作业完成时,我们就确定可以提交哪些作业(如果有)。为此,我们使用作业完成回调函数:
DEBUG = True
def worker(tpl):
import time
print('Starting work on:', tpl, flush=True)
time.sleep(.5) # Simulate work being done
...
print('Completed work on:', tpl, flush=True)
def main(jobs):
from multiprocessing import Pool
from collections import defaultdict
from functools import partial
from threading import Event
starts_immediately = []
depends_on = {}
precedes = defaultdict(set)
for job_number, job in enumerate(jobs):
_, dependency = job
if dependency is None:
starts_immediately.append(job_number)
else:
depends_on[job_number] = set(dependency)
for job_number_2 in dependency:
precedes[job_number_2].add(job_number)
if DEBUG:
print('starts _immediately:', starts_immediately)
print('depends on:', depends_on)
print('precedes:', precedes)
print()
jobs_completed = Event()
jobs_to_complete = len(jobs)
with Pool() as pool:
def my_callback(job_number, result):
nonlocal jobs_to_complete
jobs_to_complete -= 1
if jobs_to_complete == 0: # Ww have completed all jobs:
jobs_completed.set()
return
for job_number_2 in precedes[job_number]:
s = depends_on[job_number_2]
s.remove(job_number) # This dependency completed
if not s: # No more dependencies to wait for:
pool.apply_async(worker, args=(jobs[job_number_2][0],), callback=partial(my_callback, job_number_2))
# The jobs we can initially submit to get things rolling:
for job_number in starts_immediately:
pool.apply_async(worker, args=(jobs[job_number][0],), callback=partial(my_callback, job_number))
jobs_completed.wait() # Wait for all jobs to complete
if __name__ == '__main__':
jobs = [[(2, 'dog'), None],
[(-1, 'cat'), (0,)],
[(-1, 'Bob'), (1,)],
[(7, 'Alice'), None],
[(0, 'spam'), (2,3)]]
main(jobs)
Run Code Online (Sandbox Code Playgroud)
印刷:
starts _immediately: [0, 3]
depends on: {1: {0}, 2: {1}, 4: {2, 3}}
precedes: defaultdict(<class 'set'>, {0: {1}, 1: {2}, 2: {4}, 3: {4}})
Starting work on: (2, 'dog')
Starting work on: (7, 'Alice')
Completed work on: (2, 'dog')
Completed work on: (7, 'Alice')
Starting work on: (-1, 'cat')
Completed work on: (-1, 'cat')
Starting work on: (-1, 'Bob')
Completed work on: (-1, 'Bob')
Starting work on: (0, 'spam')
Completed work on: (0, 'spam')
Run Code Online (Sandbox Code Playgroud)