提高性能(矢量化?) pandas.groupby.aggregate

Luc*_*rio 5 python pandas pandas-apply pandas-groupby

pandas.groupby.aggregate我正在尝试使用自定义聚合函数来提高操作的性能。我注意到 - 如果我错了,请纠正我 -pandas按顺序调用每个块上的聚合函数(我怀疑它是一个简单的for循环)。

由于pandas很大程度上基于,有没有办法使用 的矢量化特征numpy来加速计算?numpy

我的代码

在我的代码中,我需要将风数据平均样本聚合在一起。虽然平均风速很简单,但平均风向需要更多的临时代码(例如,1 度和 359 度的平均值是 0 度,而不是 180 度)。

我的聚合函数的作用是:

  1. 删除 NaN
  2. 如果不存在其他值则返回 NaN
  3. 检查是否存在指示可变风向的特殊标志。如果是,则返回标志
  4. 使用矢量平均算法平均风向

其功能是:

def meandir(x):
    '''
    Parameters
    ----------
    x : pandas.Series
        pandas series to be averaged

    Returns
    -------
    float
        averaged wind direction
    '''

    # Removes the NaN from the recording
    x = x.dropna()

    # If the record is empty, return NaN
    if len(x)==0:
        return np.nan

    # If the record contains variable samples (990) return variable (990)
    elif np.any(x == 990):
        return 990

    # Otherwise sum the vectors and return the angle
    else:
        angle = np.rad2deg(
                           np.arctan2(
                                   np.sum(np.sin(np.deg2rad(x))),
                                   np.sum(np.cos(np.deg2rad(x)))
                                     )
                          )

        #Wrap angles from (-pi,pi) to (0,360)
        return (angle + 360) % 360
Run Code Online (Sandbox Code Playgroud)

你可以测试它

from timeit import repeat
import pandas as pd
import numpy as np

N_samples = int(1e4)
N_nan = N_var = int(0.02 * N_samples)

# Generate random data
data = np.random.rand(N_samples,2) * [30, 360]
data[np.random.choice(N_samples, N_nan), 1] = np.nan
data[np.random.choice(N_samples, N_var), 1] = 990

# Create dataset
df = pd.DataFrame(data, columns=['WindSpeed', 'WindDir'])
df.index = pd.date_range(start='2000-01-01 00:00', periods=N_samples, freq='10min')

# Run groupby + aggregate
grouped = df.groupby(pd.Grouper(freq='H'))   # Data from 14.30 to 15.29 are rounded to 15.00
aggfuns1 = {'WindSpeed': np.mean, 'WindDir':meandir}
aggfuns2 = {'WindSpeed': np.mean, 'WindDir':np.mean}

res = repeat(stmt='grouped.agg(aggfuns1)', globals=globals(), number=1, repeat=10)
print(f'With custom aggregating function {min(res)*1000:.2f} ms')

res = repeat(stmt='grouped.agg(aggfuns2)', globals=globals(), number=1, repeat=10)
print(f'Without custom aggregating function {min(res)*1000:.2f} ms')
Run Code Online (Sandbox Code Playgroud)

在我的电脑上用于N_samples=1e4输出:

With custom aggregating function 1500.79 ms
Without custom aggregating function 2.08 ms
Run Code Online (Sandbox Code Playgroud)

自定义聚合函数的速度慢 750 倍,N_samples=1e6输出为:

With custom aggregating function 142967.17 ms
Without custom aggregating function 21.92 ms
Run Code Online (Sandbox Code Playgroud)

自定义聚合函数慢了 6500 倍!

有没有办法加快这行代码的速度?

Pie*_*e D 4

关键是尝试将所有可以整体矢量化的东西df,并且groupby只使用内置方法。

\n

这是一种方法。诀窍是将角度转换为复数,numpy 会很乐意对它求和\n(也是groupby,但groupby会拒绝mean())。因此,我们将角度转换为complex, sum,然后\n转换回角度。在您的代码中使用相同的角度“有趣的平均值”,并在您引用的维基百科页面上进行描述。

\n

关于特殊值 ( ) 的处理990,也可以向量化:s.groupby(...).count()与进行比较.replace(val, nan).groupby(...).count(),查找至少存在其中一个的所有组。

\n

无论如何,这里是:

\n
def to_complex(s):\n    return np.exp(np.deg2rad(s) * 1j)\n\ndef to_angle(s):\n    return np.angle(s, deg=True) % 360\n\ndef mask_val(s, grouper, val=990):\n    return s.groupby(grouper).count() != s.replace(val, np.nan).groupby(grouper).count()\n\ndef myagg(df, grouper, val=990, winddir='WindDir'):\n    s = df[winddir]\n    mask = mask_val(s, grouper, val)\n    gb = to_complex(s).groupby(grouper)\n    s = gb.sum()\n    cnt = gb.count()\n    s = to_angle(s) * (cnt / cnt)  # put NaN where all NaNs\n    s[mask] = val\n    \n    # other columns\n    agg = df.groupby(grouper).mean()\n    agg[winddir] = s\n\n    return agg\n
Run Code Online (Sandbox Code Playgroud)\n

应用

\n

为了方便起见,我将您的示例生成放入一个函数中gen_example(N_samples)

\n
df = gen_example(50)\nmyagg(df, pd.Grouper(freq='H'))\n\nOut[ ]:\n                     WindSpeed     WindDir\n2000-01-01 00:00:00  12.991717  354.120464\n2000-01-01 01:00:00  15.743056   60.813629\n2000-01-01 02:00:00  14.593927  245.487383\n2000-01-01 03:00:00  17.836368  131.493675\n2000-01-01 04:00:00  18.987296   27.150359\n2000-01-01 05:00:00  16.415725  194.923399\n2000-01-01 06:00:00  20.881816  990.000000\n2000-01-01 07:00:00  15.033480   44.626018\n2000-01-01 08:00:00  16.276834   29.252459\n
Run Code Online (Sandbox Code Playgroud)\n

速度

\n
df = gen_example(10_000)\n%timeit myagg(df, pd.Grouper(freq='H'))\n\nOut[ ]:\n6.76 ms \xc2\xb1 12.3 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 100 loops each)\n\ndf = gen_example(1e6)\n%timeit myagg(df, pd.Grouper(freq='H'))\n\nOut[ ]:\n189 ms \xc2\xb1 425 \xc2\xb5s per loop (mean \xc2\xb1 std. dev. of 7 runs, 10 loops each)\n
Run Code Online (Sandbox Code Playgroud)\n

测试

\n
idx = [0] * 4\ngrouper = pd.Grouper(level=0)\n\nmyagg(pd.DataFrame({'WindDir': [170, 170, 178, 182]}, index=idx), grouper)\n      WindDir\n0  174.998473\n\nmyagg(pd.DataFrame({'WindDir': [330, 359, 1, 40]}, index=idx), grouper)\n    WindDir\n0  2.251499\n\nmyagg(pd.DataFrame({'WindDir': [330, 359, 1, np.nan]}, index=idx), grouper)\n      WindDir\n0  350.102878\n\nmyagg(pd.DataFrame({'WindDir': [np.nan, np.nan, np.nan, np.nan]}, index=idx), grouper)\n   WindDir\n0      NaN\n\nmyagg(pd.DataFrame({'WindDir': [330, 990, 1, np.nan]}, index=idx), grouper)\n   WindDir\n0    990.0\n
Run Code Online (Sandbox Code Playgroud)\n