使用dask计算移动平均线

sus*_*mit 0 python pandas dask

我正在尝试计算一个非常大的数据集的移动平均值。行数约为30M。使用pandas来说明如下

df = pd.DataFrame({'cust_id':['a', 'a', 'a', 'b', 'b'], 'sales': [100, 200, 300, 400, 500]})
df['mov_avg'] = df.groupby("cust_id")["sales"].apply(lambda x: x.ewm(alpha=0.5, adjust=False).mean())
Run Code Online (Sandbox Code Playgroud)

这里我使用 pandas 来计算移动平均值。使用上面的方法,在 30M 数据集上计算大约需要 20 分钟。有没有办法在这里利用 DASK?

KRK*_*rov 5

您可以使用 Dask.delayed 进行计算。在下面的示例中,包含 pandas 移动平均命令的标准 python 函数使用 @delayed 装饰器转换为 dask 函数。

import pandas as pd
from dask import delayed

@delayed
def mov_average(x):
    x['mov_avg'] = x.groupby("cust_id")["sales"].apply(
                            lambda x: x.ewm(alpha=0.5, adjust=False).mean())
    return x

df = pd.DataFrame({'cust_id':['a', 'a', 'a', 'b', 'b'],
                   'sales': [100, 200, 300, 400, 500]})

df['mov_avg'] = df.groupby("cust_id")["sales"].apply(
                            lambda x: x.ewm(alpha=0.5, adjust=False).mean())

df_1 = mov_average(df).compute()
Run Code Online (Sandbox Code Playgroud)

输出

df
Out[22]: 
  cust_id  sales  mov_avg
0       a    100    100.0
1       a    200    150.0
2       a    300    225.0
3       b    400    400.0
4       b    500    450.0

df_1
Out[23]: 
  cust_id  sales  mov_avg
0       a    100    100.0
1       a    200    150.0
2       a    300    225.0
3       b    400    400.0
4       b    500    450.0
Run Code Online (Sandbox Code Playgroud)

或者,您可以尝试将(或读取文件)转换为 dask 数据帧。调度程序任务的可视化显示了计算的并行化。因此,如果您的数据框足够大,您的计算时间可能会减少。您还可以尝试优化数据框分区的数量。

from dask import dataframe

ddf = dataframe.from_pandas(df, npartitions=3)
ddf['emv'] = ddf.groupby('cust_id')['sales'].apply(lambda x: x.ewm(alpha=0.5, adjust=False).mean()).compute().sort_index()
ddf.visualize()
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

ddf.compute()

        cust_id     sales   emv
    0   a   100     100.0
    1   a   200     150.0
    2   a   300     225.0
    3   b   400     400.0
    4   b   500     450.0
Run Code Online (Sandbox Code Playgroud)