如何并行化一个简单的Python循环?

mem*_*elf 200 python parallel-processing

这可能是一个微不足道的问题,但我如何在python中并行化以下循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)
Run Code Online (Sandbox Code Playgroud)

我知道如何在Python中启动单线程,但我不知道如何"收集"结果.

多个过程也可以 - 对于这种情况最简单的事情.我正在使用当前的Linux,但代码应该在Windows和Mac上运行.

并行化此代码的最简单方法是什么?

Sve*_*ach 162

由于全局解释器锁(GIL),在CPython上使用多个线程不会为纯Python代码提供更好的性能.我建议改用multiprocessing模块:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Run Code Online (Sandbox Code Playgroud)

请注意,这在交互式解释器中不起作用.

为了避免围绕GIL进行通常的FUD:无论如何都没有使用线程的优势.你在这里使用进程,而不是线程,因为它们避免了一大堆问题.

  • 由于这是选择的答案,是否有可能有更全面的例子?`calc_stuff`有什么参数? (23认同)
  • @EduardoPignatelli 请阅读 `multiprocessing` 模块的文档以获得更全面的示例。`Pool.map()` 基本上像 `map()` 一样工作,但是是并行的。 (7认同)
  • 为了避免其他人陷入我刚刚所做的陷阱 - 池的实例化和“pool.map”的调用需要在函数内部:/sf/ask/2309712821/ - 地图调用 (4认同)
  • 有没有一种方法可以简单地在此代码结构中添加tqdm加载栏?我用过tqdm(pool.imap(calc_stuff,range(0,10 * offset,offset))))但我没有完整的加载条形图。 (3认同)
  • 对于 tqdm 加载栏,请参阅此问题:/sf/ask/2934408711/ (2认同)

Gae*_*aux 53

为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值.不仅是短语法,还有迭代的透明聚合,当它们非常快(消除开销)或捕获子进程的回溯时,有更好的错误报告.

免责声明:我是joblib的原作者.

  • 我用 jupyter 试过 joblib,它不工作。并行延迟调用后,页面停止工作。 (2认同)
  • 您好,我在使用 joblib 时遇到问题(/sf/ask/3651660071/),您知道可能是什么原因吗?非常感谢。 (2认同)

Aar*_*all 40

并行化此代码的最简单方法是什么?

我真的很喜欢concurrent.futures这一点,在Python3提供自3.2版本 -并通过反向移植到2.6和2.7上的PyPI.

您可以使用线程或进程并使用完全相同的接口.

把它放在一个文件中 - futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

这是输出:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Run Code Online (Sandbox Code Playgroud)

多线程

现在更改ProcessPoolExecutorThreadPoolExecutor,然后再次运行该模块:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Run Code Online (Sandbox Code Playgroud)

现在你已经完成了多线程和多处理!

关于性能的注意事项并一起使用.

采样太小,无法比较结果.

但是,我怀疑多线程通常比多处理更快,特别是在Windows上,因为Windows不支持分配,所以每个新进程都需要花时间启动.在Linux或Mac上,他们可能会更接近.

您可以在多个进程中嵌套多个线程,但建议不要使用多个线程来分离多个进程.


Ham*_*mza 39

这是最简单的方法!

您可以使用asyncio。(文档可以在这里找到)。它被用作多个 Python 异步框架的基础,提供高性能网络和 Web 服务器、数据库连接库、分布式任务队列等。此外,它还具有高级和低级 API 以适应任何类型的问题.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code
Run Code Online (Sandbox Code Playgroud)

现在这个函数将在每次调用时并行运行,而不会将主程序置于等待状态。您也可以使用它来并行化 for 循环。当调用 for 循环时,虽然循环是顺序的,但是一旦解释器到达那里,每次迭代都会与主程序并行运行。 例如:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')
Run Code Online (Sandbox Code Playgroud)

这会产生以下输出:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
Run Code Online (Sandbox Code Playgroud)

  • 谢谢你!我同意这是最简单的方法 (2认同)
  • 很好的例子,有没有办法在最终打印之前等待 - `print('loop finished')` (2认同)
  • 您是否找到了最后打印“循环完成”的方法? (2认同)
  • @Koder101等待最终结果我认为你将其替换为: for i in range(10): your_function(i) 与此: futures = [your_function(i) for i in range(10)] result = wait asyncio.gather( *期货) (2认同)
  • 我能够在 1-2 分钟内运行它。现在我的代码运行得更快了。谢谢你! (2认同)

tyr*_*rex 26

from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)
Run Code Online (Sandbox Code Playgroud)

以上工作在我的机器上很漂亮(Ubuntu,包joblib已经预安装,但可以通过安装pip install joblib).

摘自https://blog.dominodatalab.com/simple-parallelization/

  • 我试过你的代码,但在我的系统上,这段代码的顺序版本大约需要半分钟,上面的并行版本需要 4 分钟。为什么这样? (5认同)
  • 感谢您的回答!我认为这是 2019 年最优雅的方式。 (4认同)
  • @tyrex 感谢分享!这个 joblib 包很棒,这个例子对我有用。不过,不幸的是,在更复杂的环境中我遇到了一个错误。https://github.com/joblib/joblib/issues/949 (2认同)
  • @shaifaliGupta我认为这实际上取决于你的函数processInput为每个样本花费的时间。如果每个 i 的时间很短,你将看不到任何改进。我实际上尝试了代码,看看函数 processInput 是否花费了很少的时间,那么 for 循环实际上执行得更好。但是,如果您的函数 processInput 需要很长时间才能运行。使用这种并行方法要优越得多。 (2认同)
  • 这是可行的,但对于任何尝试在 Windows 上使用它并通过 jupyter 笔记本显示输出的人来说,您将遇到这里的问题 /sf/ask/3916873131/在 jupyter 笔记本中使用 joblib (2认同)

Rob*_*ara 10

使用Ray有许多优点:

  • 除了多个核心(使用相同的代码)之外,您还可以在多台计算机上进行并行化.
  • 通过共享内存(和零拷贝序列化)有效处理数值数据.
  • 分布式调度的高任务吞吐量.
  • 容错.

在您的情况下,您可以启动Ray并定义远程功能

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3
Run Code Online (Sandbox Code Playgroud)

然后并行调用它

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
Run Code Online (Sandbox Code Playgroud)

要在群集上运行相同的示例,唯一可以更改的行是对ray.init()的调用.相关文档可在此处找到.

请注意,我正在帮助开发Ray.

  • 对于任何考虑 ray 的人来说,了解它本身并不支持 Windows 可能是相关的。可以使用 WSL(Linux 的 Windows 子系统)进行一些修改,让它在 Windows 中工作,但如果您想使用 Windows,它很难是开箱即用的。 (7认同)
  • 遗憾的是它还不支持 Python 3.9。 (3认同)

itw*_*kix 10

达斯克期货;我很惊讶还没有人提到过它。。。

from dask.distributed import Client

client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)

def my_function(i):
    output = <code to execute in the for loop here>
    return output

futures = []

for i in <whatever you want to loop across here>:
    future = client.submit(my_function, i)
    futures.append(future)

results = client.gather(futures)
client.close()
Run Code Online (Sandbox Code Playgroud)


Fel*_*êdo 7

谢谢@iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'
Run Code Online (Sandbox Code Playgroud)

  • -1。这是一个仅限代码的答案。我建议添加一个解释,告诉读者您发布的代码的用途,以及他们可以在哪里找到其他信息。 (7认同)

miu*_*uxu 6

我发现joblib对我很有用。请看下面的例子:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))
Run Code Online (Sandbox Code Playgroud)

n_jobs=-1:使用所有可用的内核

  • 您知道,最好在发布您自己的答案之前检查现有答案。[这个答案](/sf/answers/3564836201/) 也建议使用`joblib`。 (23认同)

w-m*_*w-m 6

tqdm 库并发包装器是并行化长时间运行的代码的好方法。tqdm 通过智能进度表提供有关当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。

可以通过简单调用将循环重写为并发线程运行thread_map,或者通过简单调用将循环重写为并发多进程运行process_map

from tqdm.contrib.concurrent import thread_map, process_map


def calc_stuff(num, multiplier):
    import time

    time.sleep(1)

    return num, num * multiplier


if __name__ == "__main__":

    # let's parallelize this for loop:
    # results = [calc_stuff(i, 2) for i in range(64)]

    loop_idx = range(64)
    multiplier = [2] * len(loop_idx)

    # either with threading:
    results_threading = thread_map(calc_stuff, loop_idx, multiplier)

    # or with multi-processing:
    results_processes = process_map(calc_stuff, loop_idx, multiplier)
Run Code Online (Sandbox Code Playgroud)


jac*_*doe 5

为什么不使用线程和一个互斥锁来保护一个全局列表?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data
Run Code Online (Sandbox Code Playgroud)

请记住,您将与最慢的线程一样快

  • @skrrgwasme我知道您知道这一点,但是当您使用“它们不会并行化任何东西”这个词时,可能会误导读者。如果操作由于受IO约束或在等待事件时处于睡眠状态而花费很长时间,则解释器将被释放以运行其他线程,因此这将导致人们在这些情况下期望的速度提高。skrrgwasme所说的仅影响受CPU约束的线程。 (3认同)
  • 我知道这是一个非常古老的答案,因此无所事事地获得随机降级票实在令人um舌。我之所以投票,是因为线程不会并行化任何东西。由于全局解释器锁定,Python中的线程一次只能绑定到一个在解释器上执行的线程,因此它们支持[并发编程,但不并行](http://stackoverflow.com/q/1897993/2615940)作为OP正在请求。 (2认同)