yem*_*emu 24 python multiprocessing pandas
我正在尝试将多处理与pandas数据帧一起使用,即将数据帧拆分为8个部分.使用apply(每个部分在不同的过程中处理)对每个部分应用一些功能.
编辑:这是我最终找到的解决方案:
import multiprocessing as mp
import pandas.util.testing as pdt
def process_apply(x):
# do some stuff to data here
def process(df):
res = df.apply(process_apply, axis=1)
return res
if __name__ == '__main__':
p = mp.Pool(processes=8)
split_dfs = np.array_split(big_df,8)
pool_results = p.map(aoi_proc, split_dfs)
p.close()
p.join()
# merging parts processed by different processes
parts = pd.concat(pool_results, axis=0)
# merging newly calculated parts to big_df
big_df = pd.concat([big_df, parts], axis=1)
# checking if the dfs were merged correctly
pdt.assert_series_equal(parts['id'], big_df['id'])
Run Code Online (Sandbox Code Playgroud)
小智 16
基于作者解决方案的更通用的版本,允许在每个函数和数据帧上运行它:
from multiprocessing import Pool
from functools import partial
import numpy as np
def parallelize(data, func, num_of_processes=8):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def run_on_subset(func, data_subset):
return data_subset.apply(func, axis=1)
def parallelize_on_rows(data, func, num_of_processes=8):
return parallelize(data, partial(run_on_subset, func), num_of_processes)
Run Code Online (Sandbox Code Playgroud)
所以下面这行:
df.apply(some_func, axis=1)
Run Code Online (Sandbox Code Playgroud)
会变成:
parallelize_on_rows(df, some_func)
Run Code Online (Sandbox Code Playgroud)
这是我发现有用的一些代码。自动将数据帧分割成您拥有的任意多个 CPU 核心。
import pandas as pd
import numpy as np
import multiprocessing as mp
def parallelize_dataframe(df, func):
num_processes = mp.cpu_count()
df_split = np.array_split(df, num_processes)
with mp.Pool(num_processes) as p:
df = pd.concat(p.map(func, df_split))
return df
def parallelize_function(df):
df[column_output] = df[column_input].apply(example_function)
return df
def example_function(x):
x = x*2
return x
Run Code Online (Sandbox Code Playgroud)
跑步:
df_output = parallelize_dataframe(df, parallelize_function)
Run Code Online (Sandbox Code Playgroud)
小智 7
You can use https://github.com/nalepae/pandarallel, as in the following example:
from pandarallel import pandarallel
from math import sin
pandarallel.initialize()
def func(x):
return sin(x**2)
df.parallel_apply(func, axis=1)
Run Code Online (Sandbox Code Playgroud)
这对我来说效果很好:
rows_iter = (row for _, row in df.iterrows())
with multiprocessing.Pool() as pool:
df['new_column'] = pool.map(process_apply, rows_iter)
Run Code Online (Sandbox Code Playgroud)
由于我没有太多的数据脚本,这是一个猜测,但我建议使用p.map而不是apply_async回调。
p = mp.Pool(8)
pool_results = p.map(process, np.array_split(big_df,8))
p.close()
p.join()
results = []
for result in pool_results:
results.extend(result)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
15925 次 |
| 最近记录: |