如何在 Python 中并行处理 pandas DataFrame 的行

Tho*_*ius 5 python multiprocessing pandas

示例性虚拟示例:

\n

我有一个数据框df

\n
> df\n\n       para0  para1   para2\n0  17.439020   True    high\n1  19.757758   True    high\n2  12.434424   True  medium\n3  14.789654   True     low\n4  14.131464  False    high\n5   9.900233   True    high\n6  10.977869  False     low\n7   8.004251   True  medium\n8  11.468420  False     low\n9  12.764453  False    high\n
Run Code Online (Sandbox Code Playgroud)\n

其中每一行都包含函数的参数集合\n foobar

\n
def foobar(r):\n    """ r is a row of df, does something, and it takes a long time"""\n    if r.para1:\n        x = r.para2\n    else:\n        x = 'low'\n    return int(r.para0), (r.Index+13)%3 == 0, x\n
Run Code Online (Sandbox Code Playgroud)\n

我想应用foobar到 的每一行df,收集其结果,并将这些结果与其相关参数一起存储在 DataFrame 中。

\n

我的(当前)解决方案:

\n
df['count'] = 0\ndf['valid'] = False\ndf['outpt'] = ''\n\ndef wrapper(r, df):\n    c, v, o = foobar(r)\n    df.ix[r.Index,'count'] = c\n    df.ix[r.Index,'valid'] = v\n    df.ix[r.Index,'outpt'] = o\n\nfor r in df.itertuples():\n    wrapper(r, df)\n
Run Code Online (Sandbox Code Playgroud)\n

这产生:

\n
> df\n       para0  para1   para2  count  valid   outpt\n0  17.439020   True    high   17.0  False    high\n1  19.757758   True    high   19.0  False    high\n2  12.434424   True  medium   12.0   True  medium\n3  14.789654   True     low   14.0  False     low\n4  14.131464  False    high   14.0  False     low\n5   9.900233   True    high    9.0   True    high\n6  10.977869  False     low   10.0  False     low\n7   8.004251   True  medium    8.0  False  medium\n8  11.468420  False     low   11.0   True     low\n9  12.764453  False    high   12.0  False     low\n
Run Code Online (Sandbox Code Playgroud)\n

这是我的问题:

\n

在现实生活中,该函数的foobar计算量较大,运行df时间大约为 20-30 分钟,通常有 100-2000 行。我可以访问具有\n个核心的计算机,并且由于foobar仅取决于当前处理的行\n而不依赖于其他任何内容,因此并行运行这些\n计算应该很简单。

\n

如果出现问题(例如,如果有人不小心关闭了机器),则不必从头开始,即跳过已经处理过的行,这也是很好的。

\n

我怎样才能做到这一点?

\n
\n

不幸的是我的尝试multiprocessing失败了:

\n
from multiprocessing import Pool\n\npool = Pool(3)\nresults = []\n\nfor r in df.itertuples():\n    results += [pool.apply_async(wrapper, r, df)]\n
Run Code Online (Sandbox Code Playgroud)\n

和:

\n
> results[0].get()\n\xe2\x80\xa6\n/usr/lib/python3.5/multiprocessing/reduction.py in dumps(cls, obj, protocol)\n     48     def dumps(cls, obj, protocol=None):\n     49         buf = io.BytesIO()\n---> 50         cls(buf, protocol).dump(obj)\n     51         return buf.getbuffer()\n     52\n\nPicklingError: Can't pickle <class 'pandas.core.frame.Pandas'>: attribute lookup Pandas on pandas.core.frame failed\n
Run Code Online (Sandbox Code Playgroud)\n

以下是我创建玩具 DataFrame 的方法:

\n
import pandas as pd\nimport numpy as np\n\ndf = pd.DataFrame({\n    'para0' : pd.Series(\n        np.random.gamma(12,size=10),\n        dtype=np.float),\n    'para1' : pd.Series(\n        [(True,False)[i] for i in np.random.randint(0,2,10)],\n        dtype=np.bool),\n    'para2' : pd.Categorical(\n        [('low','medium','high')[i] for i in np.random.randint(0,3,10)],\n        ordered=True),\n    })\n
Run Code Online (Sandbox Code Playgroud)\n

小智 2

我不知道它是否有帮助,但尝试使用list而不是itertuples.

我的意思是这样的:

df_list = [[x[0], x[1],x[2]] for x in df.itertuples()]
for r in df_list:
    results += [pool.apply_async(wrapper, r, df)]
Run Code Online (Sandbox Code Playgroud)