Python:在pandas数据帧上使用多处理

dus*_*tin 29 python multiprocessing pandas

我想multiprocessing在大型数据集上使用以找到两个gps点之间的距离.我构建了一个测试集,但是我无法multiprocessing在这个集合上工作.

import pandas as pd
from geopy.distance import vincenty
from itertools import combinations
import multiprocessing as mp

df = pd.DataFrame({'ser_no': [1, 2, 3, 4, 5, 6, 7, 8, 9, 0],
                'co_nm': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'lat': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'lon': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def calc_dist(x):
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], x], 
                           df.loc[c[1], x])
                 ]
                 for grp,lst in df.groupby('co_nm').groups.items()
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])

if __name__ == '__main__':
    pool = mp.Pool(processes = (mp.cpu_count() - 1))
    pool.map(calc_dist, ['lat','lon'])
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

当发生此错误时,我在Windows7 Professional上使用Python 2.7.11和Ipython 4.1.2与Anaconda 2.5.0 64位.

runfile('C:/.../ Desktop/multiprocessing test.py',wdir ='C:/.../ Desktop')Traceback(最近一次调用last):

文件"",第1行,在runfile中('C:/.../ Desktop/multiprocessing test.py',wdir ='C:/.../ Desktop')

文件"C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py",第699行,在runfile execfile(filename,namespace)中

文件"C:...\Local\Continuum\Anaconda2\lib\site-packages\spyderlib\widgets\externalshell\sitecustomize.py",第74行,在execfile exec中(compile(scripttext,filename,'e​​xec'),glob ,loc)

文件"C:/..../ multiprocessing test.py",第33行,在pool.map中(calc_dist,['lat','lon'])

文件"C:...\AppData\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py",第251行,在map中返回self.map_async(func,iterable,chunksize).get()

文件"C:...\Local\Continuum\Anaconda2\lib\multiprocessing\pool.py",第567行,在get raise self._value中

TypeError:无法从1创建Point实例.

def get(self, timeout=None):
    self.wait(timeout)
    if not self._ready:
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value
Run Code Online (Sandbox Code Playgroud)

ptr*_*trj 26

怎么了

您的代码中的这一行:

pool.map(calc_dist, ['lat','lon'])
Run Code Online (Sandbox Code Playgroud)

产生2个进程 - 一个运行calc_dist('lat'),另一个运行calc_dist('lon').比较doc中的第一个示例.(基本上,pool.map(f, [1,2,3])调用f在下面的列表中给出的参数三次f(1),f(2)f(3)).如果我没有记错的话,你的函数calc_dist只能叫calc_dist('lat', 'lon').它不允许并行处理.

我相信你想要在进程之间拆分工作,可能会将每个元组发送(grp, lst)到一个单独的进程.以下代码就是这样做的.

首先,让我们准备分裂:

grp_lst_args = list(df.groupby('co_nm').groups.items())

print(grp_lst_args)
[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
Run Code Online (Sandbox Code Playgroud)

我们将发送这些元组中的每一个(这里,其中有三个)作为单独进程中函数的参数.我们需要重写函数,让我们调用它calc_dist2.为方便起见,它的参数是一个元组calc_dist2(('aa',[0,1,2]))

def calc_dist2(arg):
    grp, lst = arg
    return pd.DataFrame(
               [ [grp,
                  df.loc[c[0]].ser_no,
                  df.loc[c[1]].ser_no,
                  vincenty(df.loc[c[0], ['lat','lon']], 
                           df.loc[c[1], ['lat','lon']])
                 ]
                 for c in combinations(lst, 2)
               ],
               columns=['co_nm','machineA','machineB','distance'])
Run Code Online (Sandbox Code Playgroud)

现在来了多处理:

pool = mp.Pool(processes = (mp.cpu_count() - 1))
results = pool.map(calc_dist2, grp_lst_args)
pool.close()
pool.join()

results_df = pd.concat(results)
Run Code Online (Sandbox Code Playgroud)

results是结果的通话清单(此处数据帧)calc_dist2((grp,lst))(grp,lst)grp_lst_args.results稍后将元素连接到一个数据框.

print(results_df)
  co_nm  machineA  machineB          distance
0    aa         1         2  156.876149391 km
1    aa         1         3  313.705445447 km
2    aa         2         3  156.829329105 km
0    cc         8         9  156.060165391 km
1    cc         8         0  311.910998169 km
2    cc         9         0  155.851498134 km
0    bb         4         5  156.665641837 km
1    bb         4         6  313.214333025 km
2    bb         4         7  469.622535339 km
3    bb         5         6  156.548897414 km
4    bb         5         7  312.957597466 km
5    bb         6         7   156.40899677 km
Run Code Online (Sandbox Code Playgroud)

顺便说一下,在Python 3中我们可以使用一个with结构:

with mp.Pool() as pool:
    results = pool.map(calc_dist2, grp_lst_args)
Run Code Online (Sandbox Code Playgroud)

更新

我只在linux上测试过这段代码.在Linux上,只读数据框df可以被子进程访问,而不是复制到它们的内存空间,但我不确定它在Windows上是如何工作的.您可以考虑拆分df为块(按分组co_nm)并将这些块作为参数发送到其他版本的calc_dist.


Sha*_*tar 6

我编写了一个包,用于在多核上的 Series、DataFrames 和 GroupByDataFrames 上使用 apply 方法。它使得在 Pandas 中进行多处理变得非常容易。

您可以在https://github.com/akhtarshahnawaz/multiprocesspandas查看文档

您也可以直接使用 pip 安装包

pip install multiprocesspandas
Run Code Online (Sandbox Code Playgroud)

然后进行多处理就像导入包一样简单

from multiprocesspandas import applyparallel
Run Code Online (Sandbox Code Playgroud)

然后使用 applyparallel 而不是 apply 之类的

def func(x):
    import pandas as pd
    return pd.Series([x['C'].mean()])

df.groupby(["A","B"]).apply_parallel(func, num_processes=30)
Run Code Online (Sandbox Code Playgroud)