zen*_*apy 3 python for-loop append multiprocessing python-xarray
我需要并行化 for 循环。我当前的代码是循环遍历从 xarray 数据集中获取的 id 列表,从 xarray 数据集中获取具有当前 id 的行数据,调用函数(计算数据的三角分布),附加结果分布将函数转换为列表,完成后,它将列表转换为 xarray 数据集,其中每个结果都链接到当前的 id,因此稍后可以通过 ID 将这个数据集附加到“主”数据集。
我的代码看起来有点像这样:
from sklearn.preprocessing import MinMaxScaler
import xarray as xr
import scipy.stats as st
function call_func(data):
scaler = MinMaxScaler()
norm_data = scaler.fit_transform(np.reshape(data, (len(data),1)))
params = st.triang.fit(norm_data)
arg,loc,scale = params[:-2],params[-2],params[-1]
dist = st.triang(loc=loc, scale=scale, *arg)
return dist
if __name__ == "__main__":
for id in my_dataset['id'].values:
row_data= my_dataset.sel(id=id)['data'].values[0]
if len(row_data)>3 and all(row_data== 0) == False:
result = call_func(row_data)
result_list.append(result)
else:
result_list.append([])
new_dataset = xr.Dataset({'id': my_dataset['id'].values,
'dist_data':(['id','dist'],
np.reshape(np.array(result_list),(len(result_list),1)))
})
Run Code Online (Sandbox Code Playgroud)
由于 id_array 很大,我想并行化循环。这是一个普遍的问题,但是我对多处理工具很陌生。您有如何将多处理与此任务结合起来的建议吗?我的研究表明,多重处理和附加到列表并不是最明智的做法。
我将尝试给出一个简单的虚拟示例,希望您能够推断出代码所需的修改:
这是代码的常规循环版本:
id_array = [*range(10)]
result = []
for id in id_array:
if id % 2 == 0:
result.append((id, id))
else:
result.append((id, id ** 2))
print(result)
Run Code Online (Sandbox Code Playgroud)
输出:
[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49), ( 8, 8), (9, 81)]
在这里,我使用ProcessPoolExecutor,将其并行化为 4 个进程:
from concurrent.futures import ProcessPoolExecutor
id_array = [*range(10)]
def myfunc(id):
if id % 2 == 0:
return id, id
else:
return id, id ** 2
result = []
with ProcessPoolExecutor(max_workers=4) as executor:
for r in executor.map(myfunc, id_array):
result.append(r)
print(result)
Run Code Online (Sandbox Code Playgroud)
输出(相同):
[(0, 0), (1, 1), (2, 2), (3, 9), (4, 4), (5, 25), (6, 6), (7, 49), ( 8, 8), (9, 81)]
基本上:
for到返回所需值的函数ProcessPoolExecutor与使用executor.map(myfunc, id_array)