mem*_*elf 200 python parallel-processing
这可能是一个微不足道的问题,但我如何在python中并行化以下循环?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
Run Code Online (Sandbox Code Playgroud)
我知道如何在Python中启动单线程,但我不知道如何"收集"结果.
多个过程也可以 - 对于这种情况最简单的事情.我正在使用当前的Linux,但代码应该在Windows和Mac上运行.
并行化此代码的最简单方法是什么?
Sve*_*ach 162
由于全局解释器锁(GIL),在CPython上使用多个线程不会为纯Python代码提供更好的性能.我建议改用multiprocessing模块:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Run Code Online (Sandbox Code Playgroud)
请注意,这在交互式解释器中不起作用.
为了避免围绕GIL进行通常的FUD:无论如何都没有使用线程的优势.你想在这里使用进程,而不是线程,因为它们避免了一大堆问题.
Aar*_*all 40
并行化此代码的最简单方法是什么?
我真的很喜欢concurrent.futures这一点,在Python3提供自3.2版本 -并通过反向移植到2.6和2.7上的PyPI.
您可以使用线程或进程并使用完全相同的接口.
把它放在一个文件中 - futuretest.py:
import concurrent.futures
import time, random # add some random sleep time
offset = 2 # you don't supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter
def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)
def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let's see how long this takes
# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f'original inputs: {repr(output1)}')
print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
print(f'returned in order given: {repr(output3)}')
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
这是输出:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Run Code Online (Sandbox Code Playgroud)
现在更改ProcessPoolExecutor为ThreadPoolExecutor,然后再次运行该模块:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Run Code Online (Sandbox Code Playgroud)
现在你已经完成了多线程和多处理!
采样太小,无法比较结果.
但是,我怀疑多线程通常比多处理更快,特别是在Windows上,因为Windows不支持分配,所以每个新进程都需要花时间启动.在Linux或Mac上,他们可能会更接近.
您可以在多个进程中嵌套多个线程,但建议不要使用多个线程来分离多个进程.
Ham*_*mza 39
您可以使用asyncio。(文档可以在这里找到)。它被用作多个 Python 异步框架的基础,提供高性能网络和 Web 服务器、数据库连接库、分布式任务队列等。此外,它还具有高级和低级 API 以适应任何类型的问题.
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument):
#code
Run Code Online (Sandbox Code Playgroud)
现在这个函数将在每次调用时并行运行,而不会将主程序置于等待状态。您也可以使用它来并行化 for 循环。当调用 for 循环时,虽然循环是顺序的,但是一旦解释器到达那里,每次迭代都会与主程序并行运行。 例如:
@background
def your_function(argument):
time.sleep(5)
print('function finished for '+str(argument))
for i in range(10):
your_function(i)
print('loop finished')
Run Code Online (Sandbox Code Playgroud)
这会产生以下输出:
loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
Run Code Online (Sandbox Code Playgroud)
tyr*_*rex 26
from joblib import Parallel, delayed
import multiprocessing
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)
Run Code Online (Sandbox Code Playgroud)
以上工作在我的机器上很漂亮(Ubuntu,包joblib已经预安装,但可以通过安装pip install joblib).
摘自https://blog.dominodatalab.com/simple-parallelization/
Rob*_*ara 10
使用Ray有许多优点:
在您的情况下,您可以启动Ray并定义远程功能
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
Run Code Online (Sandbox Code Playgroud)
然后并行调用它
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
Run Code Online (Sandbox Code Playgroud)
要在群集上运行相同的示例,唯一可以更改的行是对ray.init()的调用.相关文档可在此处找到.
请注意,我正在帮助开发Ray.
itw*_*kix 10
达斯克期货;我很惊讶还没有人提到过它。。。
from dask.distributed import Client
client = Client(n_workers=8) # In this example I have 8 cores and processes (can also use threads if desired)
def my_function(i):
output = <code to execute in the for loop here>
return output
futures = []
for i in <whatever you want to loop across here>:
future = client.submit(my_function, i)
futures.append(future)
results = client.gather(futures)
client.close()
Run Code Online (Sandbox Code Playgroud)
谢谢@iuryxavier
from multiprocessing import Pool
from multiprocessing import cpu_count
def add_1(x):
return x + 1
if __name__ == "__main__":
pool = Pool(cpu_count())
results = pool.map(add_1, range(10**12))
pool.close() # 'TERM'
pool.join() # 'KILL'
Run Code Online (Sandbox Code Playgroud)
我发现joblib对我很有用。请看下面的例子:
from joblib import Parallel, delayed
def yourfunction(k):
s=3.14*k*k
print "Area of a circle with a radius ", k, " is:", s
element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))
Run Code Online (Sandbox Code Playgroud)
n_jobs=-1:使用所有可用的内核
tqdm 库的并发包装器是并行化长时间运行的代码的好方法。tqdm 通过智能进度表提供有关当前进度和剩余时间的反馈,我发现这对于长时间计算非常有用。
可以通过简单调用将循环重写为并发线程运行thread_map,或者通过简单调用将循环重写为并发多进程运行process_map:
from tqdm.contrib.concurrent import thread_map, process_map
def calc_stuff(num, multiplier):
import time
time.sleep(1)
return num, num * multiplier
if __name__ == "__main__":
# let's parallelize this for loop:
# results = [calc_stuff(i, 2) for i in range(64)]
loop_idx = range(64)
multiplier = [2] * len(loop_idx)
# either with threading:
results_threading = thread_map(calc_stuff, loop_idx, multiplier)
# or with multi-processing:
results_processes = process_map(calc_stuff, loop_idx, multiplier)
Run Code Online (Sandbox Code Playgroud)
为什么不使用线程和一个互斥锁来保护一个全局列表?
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
Run Code Online (Sandbox Code Playgroud)
请记住,您将与最慢的线程一样快
| 归档时间: |
|
| 查看次数: |
246117 次 |
| 最近记录: |