具有 python 多处理依赖性的作业队列

coa*_*tum 0 python multiprocessing python-multiprocessing

我有一个函数和一个工作列表:

jobs = [[(2, 'dog'), None],
        [(-1, 'cat'), (0,)], 
        [(-1, 'Bob'), (1,)],
        [(7, 'Alice'), None],
        [(0, 'spam'), (2,3)]]
Run Code Online (Sandbox Code Playgroud)

我想并行地将函数应用于参数(第一个元组),同时满足对先前作业(第二个元组)的依赖性。例如,在狗的工作完成之前,猫的工作不能开始。但是,我不想占用一个核心并等待作业的依赖项完成。相反,我想继续执行可以立即执行的不同作业,以便如果可能的话,所有核心始终保持忙碌。有小费吗?非常感谢!

Boo*_*boo 6

Charchit Agarwal 发表的评论可能是一种方式。问题是,如果一项作业具有多个依赖关系,并且这些依赖关系是在不同的进程中完成的,那么这些“超级”功能如何相互通信?因此,这是另一种方法,它使用作业完成回调在作业依赖项完成时提交新作业:

我将首先处理您的jobs列表以创建以下实例:

  1. starts_immediately:可以立即提交的作业编号列表(即 的索引jobs),因为它们没有依赖性。
  2. depends_on:集合字典。键是作业编号,其值是提交该作业之前必须完成的一组作业。
  3. precedes:集合字典。键是一个作业编号,其值是一组作业编号,在该作业完成之前无法启动这些作业编号。

然后,我们安排每当作业完成时,我们就确定可以提交哪些作业(如果有)。为此,我们使用作业完成回调函数:

DEBUG = True

def worker(tpl):
    import time

    print('Starting work on:', tpl, flush=True)
    time.sleep(.5) # Simulate work being done
    ...
    print('Completed work on:', tpl, flush=True)

def main(jobs):
    from multiprocessing import Pool
    from collections import defaultdict
    from functools import partial
    from threading import Event

    starts_immediately = []
    depends_on = {}
    precedes = defaultdict(set)
    for job_number, job in enumerate(jobs):
        _, dependency = job
        if dependency is None:
            starts_immediately.append(job_number)
        else:
            depends_on[job_number] = set(dependency)
            for job_number_2 in dependency:
                precedes[job_number_2].add(job_number)

    if DEBUG:
        print('starts _immediately:', starts_immediately)
        print('depends on:', depends_on)
        print('precedes:', precedes)
        print()

    jobs_completed = Event()

    jobs_to_complete = len(jobs)

    with Pool() as pool:
        def my_callback(job_number, result):
            nonlocal jobs_to_complete

            jobs_to_complete -= 1
            if jobs_to_complete == 0: # Ww have completed all jobs:
                jobs_completed.set()
                return

            for job_number_2 in precedes[job_number]:
                s = depends_on[job_number_2]
                s.remove(job_number) # This dependency completed
                if not s: # No more dependencies to wait for:
                    pool.apply_async(worker, args=(jobs[job_number_2][0],), callback=partial(my_callback, job_number_2))

        # The jobs we can initially submit to get things rolling:
        for job_number in starts_immediately:
            pool.apply_async(worker, args=(jobs[job_number][0],), callback=partial(my_callback, job_number))
        jobs_completed.wait() # Wait for all jobs to complete

if __name__ == '__main__':
    jobs = [[(2, 'dog'), None],
            [(-1, 'cat'), (0,)],
            [(-1, 'Bob'), (1,)],
            [(7, 'Alice'), None],
            [(0, 'spam'), (2,3)]]
    main(jobs)
Run Code Online (Sandbox Code Playgroud)

印刷:

starts _immediately: [0, 3]
depends on: {1: {0}, 2: {1}, 4: {2, 3}}
precedes: defaultdict(<class 'set'>, {0: {1}, 1: {2}, 2: {4}, 3: {4}})

Starting work on: (2, 'dog')
Starting work on: (7, 'Alice')
Completed work on: (2, 'dog')
Completed work on: (7, 'Alice')
Starting work on: (-1, 'cat')
Completed work on: (-1, 'cat')
Starting work on: (-1, 'Bob')
Completed work on: (-1, 'Bob')
Starting work on: (0, 'spam')
Completed work on: (0, 'spam')
Run Code Online (Sandbox Code Playgroud)