使用 Pandas 计算元组最长递增子序列的矢量化或有效方法

Eri*_*Lin 6 python vectorization quantitative-finance pandas subsequence

使用pandas/python,我想计算每组元组的最长递增子序列DTE,但有效地使用13M行。现在,使用 apply/iteration 大约需要 10 个小时。

这大概是我的问题:

DTE 罢工 出价
1 100 10 11
1 200 16 17
1 300 17 18
1 400 11 12
1 500 12 13
1 600 13 14
2 100 10 30
2 200 15 20
2 300 16 21
import pandas as pd
pd.DataFrame({
    'DTE': [1,1,1,1,1,1,2,2,2],
    'Strike': [100,200,300,400,500,600,100,200,300],
    'Bid': [10,16,17,11,12,13,10,15,16],
    'Ask': [11,17,18,12,13,14,30,20,21],
})
Run Code Online (Sandbox Code Playgroud)

我想要:

  • 将这些分组DTE。这里我们有两个组(DTE 1 和 DTE 2)。然后在每个组内...
  • 找到最长的成对递增子序列。排序由 确定Strike,它对于每个 DTE 组都是唯一的。所以 200 Strike 是在 100 Strike 之后。
    • 因此,200 Strike的买价要价必须大于或等于(不严格)100 Strike 买价要价。
    • 中间没有出价和要求都增加价值的任何罢工都将被删除。

在这种情况下,答案是:

DTE 罢工 出价
1 100 10 11
1 400 11 12
1 500 12 13
1 600 13 14
2 200 15 20
2 300 16 21

EACH GROUP 只保留 LONGEST 递增子序列,而不仅仅是任何递增子序列。所有其他行都被删除。

请注意,标准的最长递增子序列算法O(nlogn)不起作用。请参阅https://www.quora.com/How-can-the-SPOJ-problem-LIS2-be-solved了解原因。对于标准 O(nlogn) LIS 解决方案,示例组 DTE 2 值将失败。我目前正在使用 O(n^2) 的标准 LIS 解决方案。有一个更复杂的 O(nlog^2n),但我不认为这是我的瓶颈。

由于每一行都必须引用前几行才能在该点计算最长递增子序列,因此您似乎无法并行执行此操作?这意味着你不能矢量化?这是否意味着加快速度的唯一方法是使用 cython?或者还有其他并发解决方案吗?

我目前的解决方案是这样的:

def modify_lsc_row(row, df, longest_lsc):
    lsc_predecessor_count = 0
    lsc_predecessor_index = -1
    df_predecessors = df[(df['Bid'] <= row.Bid) &
                         (df['Ask'] <= row.Ask) &
                         (df['lsc_count'] != -1)]
    if len(df_predecessors) > 0:
        df_predecessors = df_predecessors[(df_predecessors['lsc_count'] == df_predecessors['lsc_count'].max())]
        lsc_predecessor_index = df_predecessors.index.max()
        lsc_predecessor_count = df_predecessors.at[lsc_predecessor_index, 'lsc_count']

    new_predecessor_count = lsc_predecessor_count + 1
    df.at[row.name, 'lsc_count'] = new_predecessor_count
    df.at[row.name, 'prev_index'] = lsc_predecessor_index

    if new_predecessor_count >= longest_lsc.lsc_count:
        longest_lsc.lsc_count = new_predecessor_count
        longest_lsc.lsc_index = row.name

def longest_increasing_bid_ask_subsequence(df):
    original_columns = df.columns
    df.sort_values(['Strike'], ascending=True, inplace=True)

    df.set_index(['Strike'], inplace=True)
    assert df.index.is_unique

    longest_lsc = LongestLsc()
    longest_lsc.lsc_index = df.index.max()
    longest_lsc.lsc_count = 1

    df['lsc_count'] = -1

    df.apply(lambda row: modify_lsc_row(row, df, longest_lsc),
                              axis=1)

    while longest_lsc.lsc_index != -1:
        df.at[longest_lsc.lsc_index, 'keep'] = True
        longest_lsc.lsc_index = df.at[longest_lsc.lsc_index, 'prev_index']

    df.dropna(inplace=True)


    return df.reset_index()[original_columns]


df_groups = df.groupby(['DTE'], group_keys=False, as_index=False)
df_groups.apply(longest_increasing_bid_ask_subsequence)
Run Code Online (Sandbox Code Playgroud)

更新:https : //stackoverflow.com/users/15862569/alexander-volkovsky 提到我可以使用 pandarallel 来并行化每个 DTE,因为它们都是独立的。这确实将其速度提高了 5 倍左右。但是,我想加快速度(尤其是最长递增子序列的实际优化)。另外,pandaralel 似乎无法使用 pycharm(似乎是一个已知问题https://github.com/nalepae/pandarallel/issues/76

更新:使用/sf/users/1110379861/建议:即 numba、numpy。随着我的事情变得越来越快(可能是由于开销),Pandarallel 实际上减慢了速度。所以去掉那个。10 小时 -> 2.8 分钟。相当成功。一些最大的减速是将 n^2 更改为使用 numba。即使仅用于 numba 函数,也不使用 pandas groupby apply。我发现 groupby+apply == groupby + pd.concat 的时间。并且您可以使用亚历山大所说的删除 pd.concat ,您只需选择最后要保留的行(而不是将所有不同的 df 组连接在一起)。大量其他小优化主要是通过使用线分析器发现的。

更新代码如下:

@njit
def set_list_indices(bids, asks, indices, indices_to_keep):
    entries = len(indices)

    lis_count = np.full(entries, 0)
    prev_index = np.full(entries, -1)

    longest_lis_count = -1
    longest_lis_index = -1

    for i in range(entries):
        predecessor_counts = np.where((bids <= bids[i]) & (asks <= asks[i]), lis_count, 0)
        best_predecessor_index = len(predecessor_counts) - np.argmax(predecessor_counts[::-1]) - 1

        if best_predecessor_index < i:
            prev_index[i] = best_predecessor_index

        new_count = predecessor_counts[best_predecessor_index] + 1
        lis_count[i] = new_count

        if new_count >= longest_lis_count:
            longest_lis_count = new_count
            longest_lis_index = i

    while longest_lis_index != -1:
        indices_to_keep[indices[longest_lis_index]] = True
        longest_lis_index = prev_index[longest_lis_index]


# necessary for lis algo, and groupby will preserve the order
df = df.sort_values(['Strike'], ascending=True)

# necessary for rows that were dropped. need reindexing for lis algo
df = df.reset_index(drop=True)

df_groups = df.groupby(['DTE'])

row_indices_to_keep = np.full(len(df.index), False, dtype=bool)
for name, group in df_groups:
    bids = group['Bid'].to_numpy()
    asks = group['Ask'].to_numpy()
    indices = group.index.to_numpy()
    set_list_indices(bids, asks, indices, row_indices_to_keep)

df = df.iloc[row_indices_to_keep]
Run Code Online (Sandbox Code Playgroud)

Ale*_*sky 3

寻找最长递增子序列的算法的复杂性是多少?

本文提供了一种复杂度为O(n log n)的算法。 更新:不起作用。 你甚至不需要修改代码,因为在 python 中比较适用于元组: assert (1, 2) < (3, 4)

>>> seq=[(10, 11), (16, 17), (17, 18), (11, 12), (12, 13), (13, 14)]
>>> subsequence(seq)
[(10, 11), (11, 12), (12, 13), (13, 14)]
Run Code Online (Sandbox Code Playgroud)

由于每一行必须引用前面的行才能计算出此时的最长递增子序列,因此似乎您不能并行执行此操作?

是的,但您可以并行计算每个 DTE 的序列。您可以尝试使用pandarallel之类的方法在.groupby().

>>> seq=[(10, 11), (16, 17), (17, 18), (11, 12), (12, 13), (13, 14)]
>>> subsequence(seq)
[(10, 11), (11, 12), (12, 13), (13, 14)]
Run Code Online (Sandbox Code Playgroud)

还要尝试摆脱 pandas (它非常慢)并使用原始 numpy 数组和 python 结构。您可以使用 O(n^2) 算法计算 LIS 索引,然后使用以下方法选择所需的行df.iloc