提高嵌套 for 循环迭代日期的性能

use*_*933 14 python sql numpy pandas google-bigquery

我希望了解如何提高大型数据帧(1000 万行)上的代码性能,并且我的解决方案针对和(2023-01-10, 2023-01-20, 2023-01-30)的不同组合在多个日期上循环。category_acategory_b

工作方法如下所示,它通过首先定位特定对的子集来迭代两类数据的不同配对的日期。但是,我想重构它,看看是否有一种更有效的方法。

我的输入 ( df) 如下所示:

日期 类别_a 类别_b 外流 打开 流入 最大限度 关闭 随机_str
0 2023-01-10 4 1 1 0 0 10 0 0 A
1 2023-01-20 4 1 2 0 0 20 A
2 2023-01-30 4 1 10 0 0 20 A
3 2023-01-10 4 2 2 0 0 10 0 0
4 2023-01-20 4 2 2 0 0 20
5 2023-01-30 4 2 0 0 0 20

两对(4, 1)(4,2)几天后,我的预期输出 ( results) 如下所示:

日期 类别_a 类别_b 外流 打开 流入 最大限度 关闭 随机_str
0 2023-01-10 4 1 1 0 0 10 -1 23 A
1 2023-01-20 4 1 2 -1 23 20 20 10 A
2 2023-01-30 4 1 10 20 10 20 20 A
3 2023-01-10 4 2 2 0 0 10 -2 24
4 2023-01-20 4 2 2 -2 24 20 20 0
5 2023-01-30 4 2 0 20 0 20 20

我有一个工作解决方案,使用 pandas 数据帧获取一个子集,然后循环它以获得解决方案,但我想看看如何使用也许可以提高它的性能;numpynumbapandas-multiprocessing或者dask。另一个好主意是用 BigQuery SQL 重写它。

我不确定最好的解决方案是什么,我将不胜感激任何有助于提高性能的帮助。

最小工作示例

下面的代码生成输入数据帧。

import pandas as pd
import numpy as np

# prepare the input  df
df = pd.DataFrame({
'date' : ['2023-01-10', '2023-01-20','2023-01-30', '2023-01-10', '2023-01-20','2023-01-30'] ,
'category_a' : [4, 4,4,4, 4, 4] ,
'category_b' : [1, 1,1, 2, 2,2] ,
'outflow' : [1.0, 2.0,10.0, 2.0, 2.0, 0.0],
'open' : [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] ,
'inflow' : [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] ,
'max' : [10.0, 20.0, 20.0 , 10.0, 20.0,  20.0] ,
'close' : [0.0, np.nan,np.nan, 0.0, np.nan, np.nan] ,
'buy' : [0.0, np.nan,np.nan, 0.0, np.nan,np.nan],
'random_str' : ['a', 'a', 'a', 'b', 'b', 'b'] 
})

df['date'] = pd.to_datetime(df['date'])

# get unique pairs of category_a and category_b in a dictionary
unique_pairs = df.groupby(['category_a', 'category_b']).size().reset_index().rename(columns={0:'count'})[['category_a', 'category_b']].to_dict('records')
unique_dates = np.sort(df['date'].unique())
Run Code Online (Sandbox Code Playgroud)

使用这个输入数据帧和 Numpy,下面的代码是我试图优化的。

df = df.set_index('date')
day_0 = unique_dates[0] # first date

# Using Dictionary comprehension

list_of_numbers = list(range(len(unique_pairs)))
myset  = {key: None for key in list_of_numbers}

for count_pair, value in enumerate(unique_pairs):
    
    # pair of category_a and category_b
    category_a = value['category_a']
    category_b = value['category_b']

    # subset the dataframe for the pair
    df_subset = df.loc[(df['category_a'] == category_a) & (df['category_b'] == category_b)]

    log.info(f" running for {category_a} and {category_b}")

    # day 0
    df_subset.loc[day_0, 'close'] = df_subset.loc[day_0, 'open'] + df_subset.loc[day_0, 'inflow'] - df_subset.loc[day_0, 'outflow']
    
    
    # loop over single pair using date
    for count, date in enumerate(unique_dates[1:], start=1):
        previous_date = unique_dates[count-1]

        df_subset.loc[date, 'open'] = df_subset.loc[previous_date, 'close']
        df_subset.loc[date, 'close'] = df_subset.loc[date, 'open'] + df_subset.loc[date, 'inflow'] - df_subset.loc[date, 'outflow']

        # check if closing value is negative, if so, set inflow to buy for next weeks deficit

        if df_subset.loc[date, 'close'] < df_subset.loc[date, 'max']:
            df_subset.loc[previous_date, 'buy'] = df_subset.loc[date, 'max'] - df_subset.loc[date, 'close'] + df_subset.loc[date, 'inflow']
        elif df_subset.loc[date, 'close'] > df_subset.loc[date, 'max']:
            df_subset.loc[previous_date, 'buy'] = 0
        else:
            df_subset.loc[previous_date, 'buy'] = df_subset.loc[date, 'inflow']
        
        df_subset.loc[date, 'inflow'] = df_subset.loc[previous_date, 'buy']
        df_subset.loc[date, 'close'] = df_subset.loc[date, 'open'] + df_subset.loc[date, 'inflow'] - df_subset.loc[date, 'outflow']
    
    # store all the dataframes in a container myset
    myset[count_pair] = df_subset
    
# make myset into a dataframe
result = pd.concat(myset.values()).reset_index(drop=False)
result

Run Code Online (Sandbox Code Playgroud)

之后我们可以检查解决方案是否与我们预期的相同。

from pandas.testing import assert_frame_equal

expected = pd.DataFrame({
'date' : [pd.Timestamp('2023-01-10 00:00:00'), pd.Timestamp('2023-01-20 00:00:00'), pd.Timestamp('2023-01-30 00:00:00'), pd.Timestamp('2023-01-10 00:00:00'), pd.Timestamp('2023-01-20 00:00:00'), pd.Timestamp('2023-01-30 00:00:00')] ,
'category_a' : [4, 4, 4, 4, 4, 4] ,
'category_b' : [1, 1, 1, 2, 2, 2] ,
'outflow' : [1, 2, 10, 2, 2, 0] ,
'open' : [0.0, -1.0, 20.0, 0.0, -2.0, 20.0] ,
'inflow' : [0.0, 23.0, 10.0, 0.0, 24.0, 0.0] ,
'max' : [10, 20, 20, 10, 20, 20] ,
'close' : [-1.0, 20.0, 20.0, -2.0, 20.0, 20.0] ,
'buy' : [23.0, 10.0, np.nan, 24.0, 0.0, np.nan] ,
'random_str' : ['a', 'a', 'a', 'b', 'b', 'b'] 
})

# check that the result is the same as expected
assert_frame_equal(result, expected)
Run Code Online (Sandbox Code Playgroud)

SQL 创建第一个表

解决方案也可以用sql,如果是的话可以使用下面的代码来创建初始表。

我正忙于尝试使用用户定义的函数在大查询 sql 中实现解决方案来保持逻辑的运行。这也将是解决问题的一个好方法。

WITH data AS (
  SELECT 
    DATE '2023-01-10' as date, 4 as category_a, 1 as category_b, 1 as outflow, 0 as open, 0 as inflow, 10 as max, 0 as close, 0 as buy, 'a' as random_str
  UNION ALL
  SELECT 
    DATE '2023-01-20' as date, 4 as category_a, 1 as category_b, 2 as outflow, 0 as open, 0 as inflow, 20 as max, NULL as close, NULL as buy, 'a' as random_str
  UNION ALL
  SELECT 
    DATE '2023-01-30' as date, 4 as category_a, 1 as category_b, 10 as outflow, 0 as open, 0 as inflow, 20 as max, NULL as close, NULL as buy, 'a' as random_str
  UNION ALL
  SELECT 
    DATE '2023-01-10' as date, 4 as category_a, 2 as category_b, 2 as outflow, 0 as open, 0 as inflow, 10 as max, 0 as close, 0 as buy, 'b' as random_str
  UNION ALL
  SELECT 
    DATE '2023-01-20' as date, 4 as category_a, 2 as category_b, 2 as outflow, 0 as open, 0 as inflow, 20 as max, NULL as close, NULL as buy, 'b' as random_str
  UNION ALL
  SELECT 
    DATE '2023-01-30' as date, 4 as category_a, 2 as category_b, 0 as outflow, 0 as open, 0 as inflow, 20 as max, NULL as close, NULL as buy, 'b' as random_str
)

SELECT 
  ROW_NUMBER() OVER (ORDER BY date) as " ",
  date,
  category_a,
  category_b,
  outflow,
  open,
  inflow,
  max,
  close,
  buy,
  random_str
FROM data
Run Code Online (Sandbox Code Playgroud)

Jér*_*ard 7

高效的算法

\n

首先,可以提高算法的复杂度。事实上,(df[\'category_a\'] == category_a) & (df[\'category_b\'] == category_b)遍历整个数据帧,并且这是针对 中的每个项目完成的unique_pairs。运行时间为O(U R)和。U = len(unique_pairs)R = len(df)

\n

一个有效的解决方案是执行groupby,即将数据帧分成M组,每个组共享同一对类别。此操作可以及时完成,O(R)其中R是数据帧中的行数。在实践中,Pandas 可以使用及时运行的(基于比较的)排序来实现这一点O(R log R)

\n
\n

更快地访问和转换为 Numpy

\n

此外,使用每个项目访问数据帧项目loc非常慢。事实上,Pandas 需要使用内部字典来定位列的位置,根据提供的日期查找行,根据第 i 行和第 j 列提取 DataFrame 中的值,创建一个新对象并返回它,而不是提及已完成的多项检查(例如类型和边界)。最重要的是,Pandas 引入了巨大的开销,部分原因是它的代码通常使用 CPython 来解释。

\n

更快的解决方案是提前提取列,并使用整数而不是值(如日期)迭代行。问题是排序日期的顺序可能不是数据帧子集中的顺序。我想您的输入数据框在实践中就是这种情况,但如果不是,那么您可以按日期对每个预计算组的数据框进行排序。我假设所有日期都存在于所有子集数据框中(但同样,如果不是这种情况,您可以更正 的结果groupby)。每列都可以转换为 Numpy,这样可以更快。结果是纯 Numpy 代码,不再使用 Pandas。计算密集型 Numpy 代码非常有用,因为它们通常可以进行大量优化,特别是当目标数组包含本机数值类型时。

\n

这是迄今为止的实现:

\n
df = df.set_index(\'date\')\nday_0 = unique_dates[0] # first date\n\n# Using Dictionary comprehension\n\nlist_of_numbers = list(range(len(unique_pairs)))\nmyset = {key: None for key in list_of_numbers}\n\ngroups = dict(list(df.groupby([\'category_a\', \'category_b\'])))\n\nfor count_pair, value in enumerate(unique_pairs):\n    \n    # pair of category_a and category_b\n    category_a = value[\'category_a\']\n    category_b = value[\'category_b\']\n\n    # subset the dataframe for the pair\n    df_subset = groups[(category_a, category_b)]\n\n    # Extraction of the Pandas columns and convertion to Numpy ones\n    col_open = df_subset[\'open\'].to_numpy()\n    col_close = df_subset[\'close\'].to_numpy()\n    col_inflow = df_subset[\'inflow\'].to_numpy()\n    col_outflow = df_subset[\'outflow\'].to_numpy()\n    col_max = df_subset[\'max\'].to_numpy()\n    col_buy = df_subset[\'buy\'].to_numpy()\n\n    # day 0\n    col_close[0] = col_open[0] + col_inflow[0] - col_outflow[0]\n\n    # loop over single pair using date\n    for i in range(1, len(unique_dates)):\n        col_open[i] = col_close[i-1]\n        col_close[i] = col_open[i] + col_inflow[i] - col_outflow[i]\n\n        # check if closing value is negative, if so, set inflow to buy for next weeks deficit\n        if col_close[i] < col_max[i]:\n            col_buy[i-1] = col_max[i] - col_close[i] + col_inflow[i]\n        elif col_close[i] > col_max[i]:\n            col_buy[i-1] = 0\n        else:\n            col_buy[i-1] = col_inflow[i]\n\n        col_inflow[i] = col_buy[i-1]\n        col_close[i] = col_open[i] + col_inflow[i] - col_outflow[i]\n    \n    # store all the dataframes in a container myset\n    myset[count_pair] = df_subset\n\n# make myset into a dataframe\nresult = pd.concat(myset.values()).reset_index(drop=False)\nresult\n
Run Code Online (Sandbox Code Playgroud)\n

这段代码不仅速度更快,而且更容易阅读。

\n
\n

使用 Numba 快速执行

\n

此时,一般的解决方案是使用向量化函数,但由于循环依赖性和条件,在这里有效地(如果可能的话)做到这一点实际上并不容易。一种快速解决方案是使用Numba等 JIT 编译器来生成非常快速的实现。Numba 旨在在本机类型的 Numpy 数组上高效工作,因此这是完美的用例。请注意,Numba 需要输入参数具有明确定义的(本机)类型。手动提供类型会导致 Numba 急切地(在函数定义期间)而不是延迟地(在第一次执行期间)生成代码。

\n

这是最终的结果代码:

\n
import numba as nb\n\n@nb.njit(\'(float64[:], float64[:], float64[:], int64[:], int64[:], float64[:], int64)\')\ndef compute(col_open, col_close, col_inflow, col_outflow, col_max, col_buy, n):\n    # Important checks to avoid out-of bounds that are \n    # not checked by Numba for sake of performance. \n    # If they are not true and not done, then \n    # the function can simply cause a crash.\n    assert col_open.size == n and col_close.size == n\n    assert col_inflow.size == n and col_outflow.size == n\n    assert col_max.size == n and col_buy.size == n\n\n    # day 0\n    col_close[0] = col_open[0] + col_inflow[0] - col_outflow[0]\n\n    # loop over single pair using date\n    for i in range(1, n):\n        col_open[i] = col_close[i-1]\n        col_close[i] = col_open[i] + col_inflow[i] - col_outflow[i]\n\n        # check if closing value is negative, if so, set inflow to buy for next weeks deficit\n        if col_close[i] < col_max[i]:\n            col_buy[i-1] = col_max[i] - col_close[i] + col_inflow[i]\n        elif col_close[i] > col_max[i]:\n            col_buy[i-1] = 0\n        else:\n            col_buy[i-1] = col_inflow[i]\n\n        col_inflow[i] = col_buy[i-1]\n        col_close[i] = col_open[i] + col_inflow[i] - col_outflow[i]\n\ndf = df.set_index(\'date\')\nday_0 = unique_dates[0] # first date\n\n# Using Dictionary comprehension\n\nlist_of_numbers = list(range(len(unique_pairs)))\nmyset = {key: None for key in list_of_numbers}\n\ngroups = dict(list(df.groupby([\'category_a\', \'category_b\'])))\n\nfor count_pair, value in enumerate(unique_pairs):\n    # pair of category_a and category_b\n    category_a = value[\'category_a\']\n    category_b = value[\'category_b\']\n\n    # subset the dataframe for the pair\n    df_subset = groups[(category_a, category_b)]\n\n    # Extraction of the Pandas columns and convertion to Numpy ones\n    col_open = df_subset[\'open\'].to_numpy()\n    col_close = df_subset[\'close\'].to_numpy()\n    col_inflow = df_subset[\'inflow\'].to_numpy()\n    col_outflow = df_subset[\'outflow\'].to_numpy()\n    col_max = df_subset[\'max\'].to_numpy()\n    col_buy = df_subset[\'buy\'].to_numpy()\n\n    # Numba-accelerated computation\n    compute(col_open, col_close, col_inflow, col_outflow, col_max, col_buy, len(unique_dates))\n\n    # store all the dataframes in a container myset\n    myset[count_pair] = df_subset\n\n# make myset into a dataframe\nresult = pd.concat(myset.values()).reset_index(drop=False)\nresult\n
Run Code Online (Sandbox Code Playgroud)\n

如果参数的类型与实际输入数据类型不匹配(例如 int32 与 int64 或 float64 与 int64),请随意更改参数的类型。float64[:]请注意,如果float64[::1]您知道输入数组是连续的(很可能是这种情况),则可以替换诸如 by 之类的内容。这会生成更快的代码。

\n

另请注意,这myset可以是一个列表,因为count_pair是一个递增的整数。这更简单、更快,但在您的实际代码中可能很有用。

\n
\n

绩效结果

\n

Numba 函数调用在我的机器上运行大约需要 1 \xc2\xb5s,而初始代码则需要 7.1 毫秒。这意味着仅在这个小示例中,代码的热门部分就快了 7100 倍。话虽这么说,Pandas 需要一些时间将列转换为 Numpy、创建组并合并数据帧。前者需要一个小的常数时间,对于大型数组来说可以忽略不计。后面的两个操作在较大的输入数据帧上需要更多时间,它们实际上是我机器上的主要瓶颈(在小示例上都需要 1 毫秒)。总体而言,在我的机器上,对于这个微小的示例数据帧,整个初始代码需要 16.5 毫秒,而新代码需要 3.1 毫秒。这意味着对于这么小的输入,代码速度提高了 5.3 倍。在更大的输入数据帧上,加速应该明显更好。最后,请不要这样df.groupby([\'category_a\', \'category_b\'])实际上已经预先计算过了,所以我什至不确定我们是否应该将其包含在基准测试中;)。

\n