在python中并行执行任务

Moh*_*mis 24 python queue parallel-processing multithreading

我使用的是python 2.7,我有一些看起来像这样的代码:

task1()
task2()
task3()
dependent1()

task4()
task5()
task6()
dependent2()

dependent3()
Run Code Online (Sandbox Code Playgroud)

这里唯一的依赖关系如下:dependent1需要等待tasks1-3,dependent2需要等待任务4-6,dependent3需要等待dependents1-2 ......以下就可以了:先运行整个6个任务并行,然后前两个家属并行..然后最终依赖

我喜欢尽可能多地并行运行任务,我已经搜索了一些模块,但我希望避免使用外部库,并且不确定Queue-Thread技术如何解决我的问题(也许有人可以推荐一个好资源) ?)

gec*_*cco 32

内置的threading.Thread类提供了你所需要的一切:开始一个新的线程并加入以等待一个线程的结束.

import threading

def task1():
    pass
def task2():
    pass
def task3():
    pass
def task4():
    pass
def task5():
    pass
def task6():
    pass

def dep1():
    t1 = threading.Thread(target=task1)
    t2 = threading.Thread(target=task2)
    t3 = threading.Thread(target=task3)

    t1.start()
    t2.start()
    t3.start()

    t1.join()
    t2.join()
    t3.join()

def  dep2():
    t4 = threading.Thread(target=task4)
    t5 = threading.Thread(target=task5)

    t4.start()
    t5.start()

    t4.join()
    t5.join()

def dep3():
    d1 = threading.Thread(target=dep1)
    d2 = threading.Thread(target=dep2)

    d1.start()
    d2.start()

    d1.join()
    d2.join()

d3 = threading.Thread(target=dep3)
d3.start()
d3.join()
Run Code Online (Sandbox Code Playgroud)

或者,您可以使用Queue.join等待线程结束.

  • 在SO中首次出现给了我http://stackoverflow.com/questions/1886090/return-value-from-thread (2认同)

Phi*_*itz 5

如果您愿意尝试外部库,您可以使用Ray优雅地表达任务及其依赖关系。这在单机上运行良好,这里的优点是使用 Ray 比使用 python 多处理更容易表达并行性和依赖关系,并且它不存在经常阻止多线程有效工作的 GIL(全局解释器锁)问题。此外,如果将来需要的话,可以很容易地扩展集群上的工作负载。

解决方案如下所示:

import ray

ray.init()

@ray.remote
def task1():
    pass

@ray.remote
def task2():
    pass

@ray.remote
def task3():
    pass

@ray.remote
def dependent1(x1, x2, x3):
    pass

@ray.remote
def task4():
    pass

@ray.remote
def task5():
    pass

@ray.remote
def task6():
    pass

@ray.remote
def dependent2(x1, x2, x3):
    pass

@ray.remote
def dependent3(x, y):
    pass

id1 = task1.remote()
id2 = task2.remote()
id3 = task3.remote()

dependent_id1 = dependent1.remote(id1, id2, id3)

id4 = task4.remote()
id5 = task5.remote()
id6 = task6.remote()

dependent_id2 = dependent2.remote(id4, id5, id6)

dependent_id3 = dependent3.remote(dependent_id1, dependent_id2)

ray.get(dependent_id3) # This is optional, you can get the results if the tasks return an object
Run Code Online (Sandbox Code Playgroud)

您还可以通过使用任务内部的参数并返回结果(例如说“返回值”而不是上面的“传递”),在任务之间传递实际的 python 对象。

使用“pip install ray”,上述代码可以在单台机器上开箱即用,并且也可以轻松在集群上并行化应用程序,无论是在云中还是您自己的自定义集群中,请参阅https://ray.readthedocs。 io/en/latest/autoscaling.htmlhttps://ray.readthedocs.io/en/latest/using-ray-on-a-cluster.html)。如果您的工作量稍后增加,这可能会派上用场。

免责声明:我是 Ray 的开发者之一。