对组上的 Expanding().mean() 进行性能调整

Shl*_*rtz 6 python performance pandas

用户事件的 DF 如下:

    id                timestamp
0    1  2021-11-23 11:01:00.000
1    1  2021-11-23 11:02:00.000
2    1  2021-11-23 11:10:00.000
3    1  2021-11-23 11:11:00.000
4    1  2021-11-23 11:22:00.000
5    1  2021-11-23 11:40:00.000
6    1  2021-11-23 11:41:00.000
7    1  2021-11-23 11:42:00.000
8    1  2021-11-23 11:43:00.000
9    1  2021-11-23 11:44:00.000
10   2  2021-11-23 11:01:00.000
11   2  2021-11-23 11:02:00.000
12   2  2021-11-23 11:10:00.000
13   2  2021-11-23 11:11:00.000
14   2  2021-11-23 11:22:00.000
15   2  2021-11-23 11:40:00.000
16   2  2021-11-23 11:41:00.000
17   2  2021-11-23 11:42:00.000
18   2  2021-11-23 11:43:00.000
19   2  2021-11-23 11:44:00.000
Run Code Online (Sandbox Code Playgroud)

我计算每行的平均会话时间如下:

  1. 每个会话都是一系列间隔不超过 5 分钟的事件。
  2. 我计算会话中的第一个事件与当前事件之间的秒数。
  3. 然后我计算每个用户的expanding().mean()。

这是我的代码:

def average_session_time(**kwargs):
    df = kwargs['df'].copy()
    df['timestamp'] = pd.to_datetime(df.timestamp)
    df['session_grp'] = df.groupby('id').apply(
        lambda x: (x.groupby([pd.Grouper(key="timestamp", freq='5min', origin='start')])).ngroup()).reset_index(
        drop=True).values.reshape(-1)

    # Getting relevant 5min groups
    ng = df.groupby(['id', 'session_grp'])
    df['fts'] = ng['timestamp'].transform('first')
    df['delta'] = df['timestamp'].sub(df['fts']).dt.total_seconds()

    return df.groupby('id')['delta'].expanding().mean().reset_index(drop=True)
Run Code Online (Sandbox Code Playgroud)

输出是:

0      0.000000
1     30.000000
2     20.000000
3     15.000000
4     12.000000
5     10.000000
6      8.571429
7     15.000000
8     26.666667
9     42.000000
10     0.000000
11    30.000000
12    20.000000
13    15.000000
14    12.000000
15    10.000000
16     8.571429
17    15.000000
18    26.666667
19    42.000000
Name: delta, dtype: float64
Run Code Online (Sandbox Code Playgroud)

该代码运行良好,但是当它在大型数据集上运行时,性能会受到影响并且需要很长时间来计算。我尝试调整代码,但无法获得更高的性能。如何以不同的方式编写此函数以提高性能?

这是带有运行代码的Colab 。

Jér*_*ard 6

一种非常快速的解决方案是使用NumpyNumba按照您的方式对连续行进行分组。

首先,这些列需要转换为本机 Numpy 类型,因为CPython 对象的计算速度非常慢(并且还占用更多内存)。您可以通过以下方式做到这一点:

ids = df['Id'].values.astype('S32')
timestamps = df['timestamp'].values.astype('datetime64[ns]')
Run Code Online (Sandbox Code Playgroud)

假设 ID 最多由 32 个 ASCII 字符组成。如果 ID 可以包含 unicode 字符,则可以使用'U32'它(速度稍慢)。您还可以使用np.unicode_Numpy 为您找到边界。然而,这要慢得多(因为 Numpy 需要解析所有字符串两次)。

一旦转换为datetime64[ns],时间戳就可以转换为 64 位整数,以便 Numba 进行非常快速的计算。

然后,我们的想法是将字符串 ID 转换为基本整数,因为处理字符串非常慢。您可以通过搜索不同的相邻字符串来找到具有相同 ID 的块来做到这一点:

ids_int = np.insert(np.cumsum(ids[1:] != ids[:-1], dtype=np.int64), 0, 0)
Run Code Online (Sandbox Code Playgroud)

请注意,在提供的数据集中不存在与具有不同 ID 的另一行共享相同 ID 的行集。如果此假设并不总是成立,您可以ids使用 来对输入字符串 ( )进行排序np.argsort(ids, kind='stable'),应用此解决方案,然后根据 的输出对结果重新排序np.argsort。请注意,对字符串进行排序有点慢,但仍然比问题中提供的解决方案的计算时间快得多(在我的机器上大约 100-200 毫秒)。

最后,您可以使用基本循环通过 Numba 计算结果。


完整的解决方案

这是生成的代码:

import pandas as pd
import numpy as np
import numba as nb

@nb.njit('float64[:](int64[::1], int64[::1])')
def compute_result(ids, timestamps):
    n = len(ids)
    result = np.empty(n, dtype=np.float64)
    if n == 0:
        return result
    id_group_first_timestamp = timestamps[0]
    session_group_first_timestamp = timestamps[0]
    id_group_count = 1
    id_group_delta_sum = 0.0
    last_session_group = 0
    result[0] = 0
    delay = np.int64(300e9) # 5 min (in ns)
    for i in range(1, n):
        # If there is a new group of IDs
        if ids[i-1] != ids[i]:
            id_group_first_timestamp = timestamps[i]
            id_group_delta_sum = 0.0
            id_group_count = 1
            last_session_group = 0
            session_group_first_timestamp = timestamps[i]
        else:
            id_group_count += 1
        session_group = (timestamps[i] - id_group_first_timestamp) // delay
        # If there is a new session group
        if session_group != last_session_group:
            session_group_first_timestamp = timestamps[i]
        delta = (timestamps[i] - session_group_first_timestamp) * 1e-9
        id_group_delta_sum += delta
        result[i] = id_group_delta_sum / id_group_count
        last_session_group = session_group
    return result

def fast_average_session_time(df):
    ids = df['Id'].values.astype('S32')
    timestamps = df['timestamp'].values.astype('datetime64[ns]').astype(np.int64)
    ids_int = np.insert(np.cumsum(ids[1:] != ids[:-1], dtype=np.int64), 0, 0)
    return compute_result(ids_int, timestamps)
Run Code Online (Sandbox Code Playgroud)

请注意,输出是 Numpy 数组而不是数据帧,但您可以使用 轻松从 Numpy 数组构建数据帧pd.DataFrame(result)


基准

这是我的机器上的最终性能:

Initial solution:            12_537 ms   (   x1.0)
Scott Boston's solution:      7_431 ms   (   x1.7)
This solution:                   64 ms   ( x195.9)

Time taken by 
compute_result only:              2.5 ms
Run Code Online (Sandbox Code Playgroud)

因此,该解决方案比最初的解决方案快了近200 倍。

请注意,大约85% 的时间花费在很难优化的 unicode/datetime 字符串解析上。事实上,这种处理速度很慢,因为在现代处理器上处理短 unicode 字符串本身就很昂贵,而且 CPython 对象引入了很大的开销(例如引用计数)。此外,由于 CPython GIL 和缓慢的进程间通信,该处理无法有效地并行化。因此,这段代码肯定几乎是最佳的(只要您使用 CPython)。


Sco*_*ton 2

我认为通过使用 pd.to_datetime 中的格式并使用 groupby 中的参数而不是调用 rest_index ,我可以将您的时间减少一半as_index

def average_session_time(**kwargs):
    df = kwargs['df'].copy()
    df['timestamp'] = pd.to_datetime(df.timestamp, format='%Y-%m-%d %H:%M:%S.%f')
    grp_id = df.groupby('Id', as_index=False)
    df['session_grp'] = grp_id.apply(
        lambda x: (x.groupby([pd.Grouper(key="timestamp", freq='5min', origin='start')])).ngroup()).values.reshape(-1)

    # Getting relevant 5min groups
    ng = df.groupby(['Id', 'session_grp'])
    df['fts'] = ng['timestamp'].transform('first')
    df['delta'] = df['timestamp'].sub(df['fts']).dt.total_seconds()

    return grp_id['delta'].expanding().mean().reset_index(level=0, drop=True)
Run Code Online (Sandbox Code Playgroud)

原定时间:

40.228641986846924

新时间:

16.08320665359497