如何有效地将时间戳列表与 Pandas 中的时间戳范围列表进行匹配?

Sal*_*ali 17 python indexing performance dataframe pandas

我有 3 个 Pandas 数据框

df_a = pd.DataFrame(data={
  'id': [1, 5, 3, 2],
  'ts': [3, 5, 11, 14],
  'other_cols': ['...'] * 4
})

df_b = pd.DataFrame(data={
  'id': [2, 1, 3],
  'ts': [7, 8, 15],
  'other_cols': ['...'] * 3
})

df_c = pd.DataFrame(data={
  'id': [154, 237, 726, 814, 528, 237, 248, 514],
  'ts': [1, 2, 4, 6, 9, 10, 12, 13],
  'other_cols': ['...'] * 8
})
Run Code Online (Sandbox Code Playgroud)

这是我需要解决的问题

  • 对于每个idindf_a找到相应的idindf_b及其时间戳。让我们假设ts_ats_b
  • 找到和df_c之间的所有行并计算这些行上的一些自定义函数。该函数可以是 pd 函数(95% 的情况下),但它可以是任何 python 函数。min(ts_a, ts_b)max(ts_a, ts_b)

以下是每个 id (id, ts) 的行示例:

  • 编号 1:[726, 4], [814, 6]
  • 身份证号2:[528, 9], [237, 10], [248, 12], [514, 13]
  • 编号 3:[248, 12], [514, 13]
  • id 5:只能在A中找到,但在B中找不到,所以不应该做任何事情

输出并不重要,因此任何可以映射到的东西都id可以f(rows for that id)完成这项工作。

例如,假设我需要对len结果应用一个简单的函数,我将得到以下结果

ID 资源
1 2
2 4
3 2

如果我的函数是max(ts) - min(ts),结果是:

ID 资源
1 2 = 6 - 4
2 4 = 13 - 9
3 1 = 13 - 12

以下是对数据帧的假设

  • ids每个对应的表中都是唯一的
  • 每个数据帧按以下顺序排序ts
  • 可能存在idin df_a, 不存在 in df_b,反之亦然(但丢失 id 的百分比小于 1% )
  • 表A/B可以是千万级的,表C是上亿级的
  • 虽然理论上时间戳之间可以有任意行数,但经验观察发现中位数为两位数,最大略多于一千

我的工作解决方案

尝试1

  • id -> ts从创建字典df_b。与 df_b 的长度成线性
  • ts, other_cols创建from的排序列表df_c。就 df_c 而言是线性的,因为它已经按 ts 排序
  • 迭代 df_a,然后为每个 id 在字典中找到 ts。然后在排序列表中进行两次二分搜索以找到应该分析的数据的边缘。然后应用该函数

尝试2

  • 将所有数据帧合并为一个并按 ts 排序df = pd.concat([df_a, df_b, df_c]).sort_values(by='ts').reset_index(drop=True)
  • 以滑动窗口方法迭代此数据框,并维护字典seen_ids( id -> index),在其中放置表 A/B 中的 id。如果您在此字典中看到 id,则将df.iloc[index_1:index_2]其过滤为仅 C 中的行并应用该函数

两种尝试都可以正常工作并以对数线性时间运行,但对于我的数据来说,运行需要约 20-30 分钟,这是可以忍受的,但并不理想。除此之外,还存在存储额外数据需要额外内存的问题。

我向 Pandas 专家提出的问题

这可以通过纯 Pandas 实现并且比我的自定义实现更有效吗?

rin*_*ngo 6

这是我的最新尝试。我认为它相当快,但当然速度完全取决于您尝试的表的内容。让我知道它对您有何作用。

合成数据生成:

import random
import pandas as pd


a_len = int(1e7)
c_len = int(1e8)

df_a = pd.DataFrame(data={
  'id': random.sample(population=range(a_len), k=int(a_len * .99)),
  'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
  'other_cols': ['...'] * int(a_len * .99)
})
df_a.sort_values(by=["ts"], inplace=True)

df_b = pd.DataFrame(data={
  'id': random.sample(population=range(a_len), k=int(a_len * .99)),
  'ts': random.choices(population=range(int(a_len * 10)), k=int(a_len * .99)),
  'other_cols': ['...'] * int(a_len * .99)
})
df_b.sort_values(by=["ts"], inplace=True)

df_c = pd.DataFrame(data={
  'id': range(c_len),
  'ts': random.choices(population=range(int(a_len * 1e7)), k=c_len),
  'other_cols': ['...'] * c_len
})
df_c.sort_values(by=["ts"], inplace=True)
Run Code Online (Sandbox Code Playgroud)

这些表的示例生成的一些统计数据是:

size_by_id = df_c_labeled.groupby(by=["id"]).size()

size_by_id.max()
>>> 91

size_by_id.median()
>>> 26.0
Run Code Online (Sandbox Code Playgroud)

该算法利用pandas.IntervalIndex

import functools

import numpy as np
import pandas as pd


def cartesian_product(*arrays):
    """https://stackoverflow.com/a/11146645/7059681"""
    la = len(arrays)
    dtype = np.result_type(*arrays)
    arr = np.empty([len(a) for a in arrays] + [la], dtype=dtype)
    for i, a in enumerate(np.ix_(*arrays)):
        arr[...,i] = a
    return arr.reshape(-1, la).T


# inner join on id
df_ts = pd.merge(
    left=df_a[["id", "ts"]],
    right=df_b[["id", "ts"]],
    how="inner",
    on="id",
    suffixes=["_a", "_b"]
)

# a = min ts, b = max ts
df_ts["ts_a"], df_ts["ts_b"] = (
    df_ts[["ts_a", "ts_b"]].min(axis=1),
    df_ts[["ts_a", "ts_b"]].max(axis=1),
)

a_min = df_ts["ts_a"].min()
b_max = df_ts["ts_b"].max()

interval_index = pd.IntervalIndex.from_arrays(
    left=df_ts["ts_a"],
    right=df_ts["ts_b"],
    closed="both",
)

# rename to avoid collisions
df_c.rename(columns={"id": "id_c", "ts": "ts_c"}, inplace=True)

ts_c = df_c["ts_c"].to_numpy()

df_c_idxs_list, df_ts_idxs_list = [], []

# the first item in ts_c that is at least equal to a_min
c_lo = 0
while ts_c[c_lo] < a_min:
    c_lo += 1
c_idx = c_lo
c_hi = len(ts_c)

while c_lo < c_hi and ts_c[c_lo] <= b_max:
    # the index of the next greatest ts in ts_c
    # depending on how often you many duplicate values you have in ts_c,
    # it may be faster to binary search instead of incrementing one by one
    # c_idx = bisect.bisect_right(a=ts_c, x=ts_c[c_lo], lo=c_idx, hi=c_hi)
    while c_idx < c_hi and ts_c[c_idx] == ts_c[c_lo]:
        c_idx += 1

    # the indicies of the intervals containing ts_c[c_lo]
    unique_ts_idxs = np.where(interval_index.contains(ts_c[c_lo]))[0]

    # all the indicies equal to ts_c[c_lo]
    unique_c_idxs = df_c.iloc[c_lo: c_idx].index
    
    # all the pairs of these indicies
    c_idxs, ts_idxs = cartesian_product(unique_c_idxs, unique_ts_idxs)

    df_c_idxs_list.append(c_idxs)
    df_ts_idxs_list.append(ts_idxs)

    c_lo = c_idx

df_c_idxs = np.concatenate(df_c_idxs_list)
df_ts_idxs = np.concatenate(df_ts_idxs_list)

df_c_labeled = pd.concat(
    [
        df_ts.loc[df_ts_idxs, :].reset_index(drop=True),
        df_c.loc[df_c_idxs, :].reset_index(drop=True)
    ],
    axis=1
)

print(df_c_labeled)
Run Code Online (Sandbox Code Playgroud)
   id  ts_a  ts_b  id_c  ts_c other_cols
0   1     3     8   726     4        ...
1   1     3     8   814     6        ...
2   2     7    14   528     9        ...
3   2     7    14   237    10        ...
4   3    11    15   248    12        ...
5   2     7    14   248    12        ...
6   3    11    15   514    13        ...
7   2     7    14   514    13        ...
Run Code Online (Sandbox Code Playgroud)

现在我们可以做一些groupby事情:

   id  ts_a  ts_b  id_c  ts_c other_cols
0   1     3     8   726     4        ...
1   1     3     8   814     6        ...
2   2     7    14   528     9        ...
3   2     7    14   237    10        ...
4   3    11    15   248    12        ...
5   2     7    14   248    12        ...
6   3    11    15   514    13        ...
7   2     7    14   514    13        ...
Run Code Online (Sandbox Code Playgroud)
id_groupby = df_c_labeled.groupby(by="id")
Run Code Online (Sandbox Code Playgroud)
id
1    2
2    4
3    2
Name: ts_c, dtype: int64
Run Code Online (Sandbox Code Playgroud)
id_groupby["ts_c"].size()
Run Code Online (Sandbox Code Playgroud)
id
1    2
2    4
3    1
Name: ts_c, dtype: int64
Run Code Online (Sandbox Code Playgroud)


sri*_*ath 3

我同意@QuangHong 的观点。处理如此大的数据可能效率不高。

但是,我使用 pandas 尝试了您的示例输入

合并df_adf_b基于id列。确实inner加入了,因为我们需要两个都存在的行

df_merge_a_b = df_a.merge(df_b, on=['id'], how='inner')
Run Code Online (Sandbox Code Playgroud)

求对应行的最小值和最大值

df_merge_a_b["min_ab"] = df_merge_a_b[["ts_x", "ts_y"]].min(axis=1)
df_merge_a_b["max_ab"] = df_merge_a_b[["ts_x", "ts_y"]].max(axis=1)
Run Code Online (Sandbox Code Playgroud)

有了最小值和最大值,对于数据框中的每一行,找到最小值和最大值之间的 id

def get_matching_rows(row):
    min_ab = row["min_ab"]
    max_ab = row["max_ab"]
    result = df_c[df_c["ts"].between(min_ab, max_ab)] 
    print(result)
    ## apply custom function on result and return
    
    
df_merge_a_b.apply(lambda x: get_matching_rows(x), axis=1)
    
Run Code Online (Sandbox Code Playgroud)

样本输出

    id  ts other_cols
2  726   4        ...
3  814   6        ...
    id  ts other_cols
6  248  12        ...
7  514  13        ...
    id  ts other_cols
4  528   9        ...
5  237  10        ...
6  248  12        ...
7  514  13        ...

Run Code Online (Sandbox Code Playgroud)

应用自定义函数并将所有输出连接在一起。

可能不是超级高效..但想尝试一下 pandas 中的示例。