use*_*933 14 python sql numpy pandas google-bigquery
我希望了解如何提高大型数据帧(1000 万行)上的代码性能,并且我的解决方案针对和(2023-01-10, 2023-01-20, 2023-01-30)
的不同组合在多个日期上循环。category_a
category_b
工作方法如下所示,它通过首先定位特定对的子集来迭代两类数据的不同配对的日期。但是,我想重构它,看看是否有一种更有效的方法。
我的输入 ( df
) 如下所示:
日期 | 类别_a | 类别_b | 外流 | 打开 | 流入 | 最大限度 | 关闭 | 买 | 随机_str | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 2023-01-10 | 4 | 1 | 1 | 0 | 0 | 10 | 0 | 0 | A |
1 | 2023-01-20 | 4 | 1 | 2 | 0 | 0 | 20 | 南 | 南 | A |
2 | 2023-01-30 | 4 | 1 | 10 | 0 | 0 | 20 | 南 | 南 | A |
3 | 2023-01-10 | 4 | 2 | 2 | 0 | 0 | 10 | 0 | 0 | 乙 |
4 | 2023-01-20 | 4 | 2 | 2 | 0 | 0 | 20 | 南 | 南 | 乙 |
5 | 2023-01-30 | 4 | 2 | 0 | 0 | 0 | 20 | 南 | 南 | 乙 |
两对(4, 1)
,(4,2)
几天后,我的预期输出 ( results
) 如下所示:
日期 | 类别_a | 类别_b | 外流 | 打开 | 流入 | 最大限度 | 关闭 | 买 | 随机_str | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 2023-01-10 | 4 | 1 | 1 | 0 | 0 | 10 | -1 | 23 | A |
1 | 2023-01-20 | 4 | 1 | 2 | -1 | 23 | 20 | 20 | 10 | A |
2 | 2023-01-30 | 4 | 1 | 10 | 20 | 10 | 20 | 20 | 南 | A |
3 | 2023-01-10 | 4 | 2 | 2 | 0 | 0 | 10 | -2 | 24 | 乙 |
4 | 2023-01-20 | 4 | 2 | 2 | -2 | 24 | 20 | 20 | 0 | 乙 |
5 | 2023-01-30 | 4 | 2 | 0 | 20 | 0 | 20 | 20 | 南 | 乙 |
我有一个工作解决方案,使用 pandas 数据帧获取一个子集,然后循环它以获得解决方案,但我想看看如何使用也许可以提高它的性能;numpy
,numba
,pandas-multiprocessing
或者dask
。另一个好主意是用 BigQuery SQL 重写它。
我不确定最好的解决方案是什么,我将不胜感激任何有助于提高性能的帮助。
最小工作示例
下面的代码生成输入数据帧。
import pandas as pd
import numpy as np
# prepare the input df
df = pd.DataFrame({
'date' : ['2023-01-10', '2023-01-20','2023-01-30', '2023-01-10', '2023-01-20','2023-01-30'] ,
'category_a' : [4, 4,4,4, 4, 4] ,
'category_b' : [1, 1,1, 2, 2,2] ,
'outflow' : [1.0, 2.0,10.0, 2.0, 2.0, 0.0],
'open' : [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] ,
'inflow' : [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] ,
'max' : [10.0, 20.0, 20.0 , 10.0, 20.0, 20.0] ,
'close' : [0.0, np.nan,np.nan, 0.0, np.nan, np.nan] ,
'buy' : [0.0, np.nan,np.nan, 0.0, np.nan,np.nan],
'random_str' : ['a', 'a', 'a', 'b', 'b', 'b']
})
df['date'] = pd.to_datetime(df['date'])
# get unique pairs of category_a and category_b in a dictionary
unique_pairs = df.groupby(['category_a', 'category_b']).size().reset_index().rename(columns={0:'count'})[['category_a', 'category_b']].to_dict('records')
unique_dates = np.sort(df['date'].unique())
Run Code Online (Sandbox Code Playgroud)
使用这个输入数据帧和 Numpy,下面的代码是我试图优化的。
df = df.set_index('date')
day_0 = unique_dates[0] # first date
# Using Dictionary comprehension
list_of_numbers = list(range(len(unique_pairs)))
myset = {key: None for key in list_of_numbers}
for count_pair, value in enumerate(unique_pairs):
# pair of category_a and category_b
category_a = value['category_a']
category_b = value['category_b']
# subset the dataframe for the pair
df_subset = df.loc[(df['category_a'] == category_a) & (df['category_b'] == category_b)]
log.info(f" running for {category_a} and {category_b}")
# day 0
df_subset.loc[day_0, 'close'] = df_subset.loc[day_0, 'open'] + df_subset.loc[day_0, 'inflow'] - df_subset.loc[day_0, 'outflow']
# loop over single pair using date
for count, date in enumerate(unique_dates[1:], start=1):
previous_date = unique_dates[count-1]
df_subset.loc[date, 'open'] = df_subset.loc[previous_date, 'close']
df_subset.loc[date, 'close'] = df_subset.loc[date, 'open'] + df_subset.loc[date, 'inflow'] - df_subset.loc[date, 'outflow']
# check if closing value is negative, if so, set inflow to buy for next weeks deficit
if df_subset.loc[date, 'close'] < df_subset.loc[date, 'max']:
df_subset.loc[previous_date, 'buy'] = df_subset.loc[date, 'max'] - df_subset.loc[date, 'close'] + df_subset.loc[date, 'inflow']
elif df_subset.loc[date, 'close'] > df_subset.loc[date, 'max']:
df_subset.loc[previous_date, 'buy'] = 0
else:
df_subset.loc[previous_date, 'buy'] = df_subset.loc[date, 'inflow']
df_subset.loc[date, 'inflow'] = df_subset.loc[previous_date, 'buy']
df_subset.loc[date, 'close'] = df_subset.loc[date, 'open'] + df_subset.loc[date, 'inflow'] - df_subset.loc[date, 'outflow']
# store all the dataframes in a container myset
myset[count_pair] = df_subset
# make myset into a dataframe
result = pd.concat(myset.values()).reset_index(drop=False)
result
Run Code Online (Sandbox Code Playgroud)
之后我们可以检查解决方案是否与我们预期的相同。
from pandas.testing import assert_frame_equal
expected = pd.DataFrame({
'date' : [pd.Timestamp('2023-01-10 00:00:00'), pd.Timestamp('2023-01-20 00:00:00'), pd.Timestamp('2023-01-30 00:00:00'), pd.Timestamp('2023-01-10 00:00:00'), pd.Timestamp('2023-01-20 00:00:00'), pd.Timestamp('2023-01-30 00:00:00')] ,
'category_a' : [4, 4, 4, 4, 4, 4] ,
'category_b' : [1, 1, 1, 2, 2, 2] ,
'outflow' : [1, 2, 10, 2, 2, 0] ,
'open' : [0.0, -1.0, 20.0, 0.0, -2.0, 20.0] ,
'inflow' : [0.0, 23.0, 10.0, 0.0, 24.0, 0.0] ,
'max' : [10, 20, 20, 10, 20, 20] ,
'close' : [-1.0, 20.0, 20.0, -2.0, 20.0, 20.0] ,
'buy' : [23.0, 10.0, np.nan, 24.0, 0.0, np.nan] ,
'random_str' : ['a', 'a', 'a', 'b', 'b', 'b']
})
# check that the result is the same as expected
assert_frame_equal(result, expected)
Run Code Online (Sandbox Code Playgroud)
SQL 创建第一个表
解决方案也可以用sql,如果是的话可以使用下面的代码来创建初始表。
我正忙于尝试使用用户定义的函数在大查询 sql 中实现解决方案来保持逻辑的运行。这也将是解决问题的一个好方法。
WITH data AS (
SELECT
DATE '2023-01-10' as date, 4 as category_a, 1 as category_b, 1 as outflow, 0 as open, 0 as inflow, 10 as max, 0 as close, 0 as buy, 'a' as random_str
UNION ALL
SELECT
DATE '2023-01-20' as date, 4 as category_a, 1 as category_b, 2 as outflow, 0 as open, 0 as inflow, 20 as max, NULL as close, NULL as buy, 'a' as random_str
UNION ALL
SELECT
DATE '2023-01-30' as date, 4 as category_a, 1 as category_b, 10 as outflow, 0 as open, 0 as inflow, 20 as max, NULL as close, NULL as buy, 'a' as random_str
UNION ALL
SELECT
DATE '2023-01-10' as date, 4 as category_a, 2 as category_b, 2 as outflow, 0 as open, 0 as inflow, 10 as max, 0 as close, 0 as buy, 'b' as random_str
UNION ALL
SELECT
DATE '2023-01-20' as date, 4 as category_a, 2 as category_b, 2 as outflow, 0 as open, 0 as inflow, 20 as max, NULL as close, NULL as buy, 'b' as random_str
UNION ALL
SELECT
DATE '2023-01-30' as date, 4 as category_a, 2 as category_b, 0 as outflow, 0 as open, 0 as inflow, 20 as max, NULL as close, NULL as buy, 'b' as random_str
)
SELECT
ROW_NUMBER() OVER (ORDER BY date) as " ",
date,
category_a,
category_b,
outflow,
open,
inflow,
max,
close,
buy,
random_str
FROM data
Run Code Online (Sandbox Code Playgroud)
首先,可以提高算法的复杂度。事实上,(df[\'category_a\'] == category_a) & (df[\'category_b\'] == category_b)
遍历整个数据帧,并且这是针对 中的每个项目完成的unique_pairs
。运行时间为O(U R)
和。U = len(unique_pairs)
R = len(df)
一个有效的解决方案是执行groupby,即将数据帧分成M
组,每个组共享同一对类别。此操作可以及时完成,O(R)
其中R
是数据帧中的行数。在实践中,Pandas 可以使用及时运行的(基于比较的)排序来实现这一点O(R log R)
。
此外,使用每个项目访问数据帧项目loc
非常慢。事实上,Pandas 需要使用内部字典来定位列的位置,根据提供的日期查找行,根据第 i 行和第 j 列提取 DataFrame 中的值,创建一个新对象并返回它,而不是提及已完成的多项检查(例如类型和边界)。最重要的是,Pandas 引入了巨大的开销,部分原因是它的代码通常使用 CPython 来解释。
更快的解决方案是提前提取列,并使用整数而不是值(如日期)迭代行。问题是排序日期的顺序可能不是数据帧子集中的顺序。我想您的输入数据框在实践中就是这种情况,但如果不是,那么您可以按日期对每个预计算组的数据框进行排序。我假设所有日期都存在于所有子集数据框中(但同样,如果不是这种情况,您可以更正 的结果groupby
)。每列都可以转换为 Numpy,这样可以更快。结果是纯 Numpy 代码,不再使用 Pandas。计算密集型 Numpy 代码非常有用,因为它们通常可以进行大量优化,特别是当目标数组包含本机数值类型时。
这是迄今为止的实现:
\ndf = df.set_index(\'date\')\nday_0 = unique_dates[0] # first date\n\n# Using Dictionary comprehension\n\nlist_of_numbers = list(range(len(unique_pairs)))\nmyset = {key: None for key in list_of_numbers}\n\ngroups = dict(list(df.groupby([\'category_a\', \'category_b\'])))\n\nfor count_pair, value in enumerate(unique_pairs):\n \n # pair of category_a and category_b\n category_a = value[\'category_a\']\n category_b = value[\'category_b\']\n\n # subset the dataframe for the pair\n df_subset = groups[(category_a, category_b)]\n\n # Extraction of the Pandas columns and convertion to Numpy ones\n col_open = df_subset[\'open\'].to_numpy()\n col_close = df_subset[\'close\'].to_numpy()\n col_inflow = df_subset[\'inflow\'].to_numpy()\n col_outflow = df_subset[\'outflow\'].to_numpy()\n col_max = df_subset[\'max\'].to_numpy()\n col_buy = df_subset[\'buy\'].to_numpy()\n\n # day 0\n col_close[0] = col_open[0] + col_inflow[0] - col_outflow[0]\n\n # loop over single pair using date\n for i in range(1, len(unique_dates)):\n col_open[i] = col_close[i-1]\n col_close[i] = col_open[i] + col_inflow[i] - col_outflow[i]\n\n # check if closing value is negative, if so, set inflow to buy for next weeks deficit\n if col_close[i] < col_max[i]:\n col_buy[i-1] = col_max[i] - col_close[i] + col_inflow[i]\n elif col_close[i] > col_max[i]:\n col_buy[i-1] = 0\n else:\n col_buy[i-1] = col_inflow[i]\n\n col_inflow[i] = col_buy[i-1]\n col_close[i] = col_open[i] + col_inflow[i] - col_outflow[i]\n \n # store all the dataframes in a container myset\n myset[count_pair] = df_subset\n\n# make myset into a dataframe\nresult = pd.concat(myset.values()).reset_index(drop=False)\nresult\n
Run Code Online (Sandbox Code Playgroud)\n这段代码不仅速度更快,而且更容易阅读。
\n此时,一般的解决方案是使用向量化函数,但由于循环依赖性和条件,在这里有效地(如果可能的话)做到这一点实际上并不容易。一种快速解决方案是使用Numba等 JIT 编译器来生成非常快速的实现。Numba 旨在在本机类型的 Numpy 数组上高效工作,因此这是完美的用例。请注意,Numba 需要输入参数具有明确定义的(本机)类型。手动提供类型会导致 Numba 急切地(在函数定义期间)而不是延迟地(在第一次执行期间)生成代码。
\n这是最终的结果代码:
\nimport numba as nb\n\n@nb.njit(\'(float64[:], float64[:], float64[:], int64[:], int64[:], float64[:], int64)\')\ndef compute(col_open, col_close, col_inflow, col_outflow, col_max, col_buy, n):\n # Important checks to avoid out-of bounds that are \n # not checked by Numba for sake of performance. \n # If they are not true and not done, then \n # the function can simply cause a crash.\n assert col_open.size == n and col_close.size == n\n assert col_inflow.size == n and col_outflow.size == n\n assert col_max.size == n and col_buy.size == n\n\n # day 0\n col_close[0] = col_open[0] + col_inflow[0] - col_outflow[0]\n\n # loop over single pair using date\n for i in range(1, n):\n col_open[i] = col_close[i-1]\n col_close[i] = col_open[i] + col_inflow[i] - col_outflow[i]\n\n # check if closing value is negative, if so, set inflow to buy for next weeks deficit\n if col_close[i] < col_max[i]:\n col_buy[i-1] = col_max[i] - col_close[i] + col_inflow[i]\n elif col_close[i] > col_max[i]:\n col_buy[i-1] = 0\n else:\n col_buy[i-1] = col_inflow[i]\n\n col_inflow[i] = col_buy[i-1]\n col_close[i] = col_open[i] + col_inflow[i] - col_outflow[i]\n\ndf = df.set_index(\'date\')\nday_0 = unique_dates[0] # first date\n\n# Using Dictionary comprehension\n\nlist_of_numbers = list(range(len(unique_pairs)))\nmyset = {key: None for key in list_of_numbers}\n\ngroups = dict(list(df.groupby([\'category_a\', \'category_b\'])))\n\nfor count_pair, value in enumerate(unique_pairs):\n # pair of category_a and category_b\n category_a = value[\'category_a\']\n category_b = value[\'category_b\']\n\n # subset the dataframe for the pair\n df_subset = groups[(category_a, category_b)]\n\n # Extraction of the Pandas columns and convertion to Numpy ones\n col_open = df_subset[\'open\'].to_numpy()\n col_close = df_subset[\'close\'].to_numpy()\n col_inflow = df_subset[\'inflow\'].to_numpy()\n col_outflow = df_subset[\'outflow\'].to_numpy()\n col_max = df_subset[\'max\'].to_numpy()\n col_buy = df_subset[\'buy\'].to_numpy()\n\n # Numba-accelerated computation\n compute(col_open, col_close, col_inflow, col_outflow, col_max, col_buy, len(unique_dates))\n\n # store all the dataframes in a container myset\n myset[count_pair] = df_subset\n\n# make myset into a dataframe\nresult = pd.concat(myset.values()).reset_index(drop=False)\nresult\n
Run Code Online (Sandbox Code Playgroud)\n如果参数的类型与实际输入数据类型不匹配(例如 int32 与 int64 或 float64 与 int64),请随意更改参数的类型。float64[:]
请注意,如果float64[::1]
您知道输入数组是连续的(很可能是这种情况),则可以替换诸如 by 之类的内容。这会生成更快的代码。
另请注意,这myset
可以是一个列表,因为count_pair
是一个递增的整数。这更简单、更快,但在您的实际代码中可能很有用。
Numba 函数调用在我的机器上运行大约需要 1 \xc2\xb5s,而初始代码则需要 7.1 毫秒。这意味着仅在这个小示例中,代码的热门部分就快了 7100 倍。话虽这么说,Pandas 需要一些时间将列转换为 Numpy、创建组并合并数据帧。前者需要一个小的常数时间,对于大型数组来说可以忽略不计。后面的两个操作在较大的输入数据帧上需要更多时间,它们实际上是我机器上的主要瓶颈(在小示例上都需要 1 毫秒)。总体而言,在我的机器上,对于这个微小的示例数据帧,整个初始代码需要 16.5 毫秒,而新代码需要 3.1 毫秒。这意味着对于这么小的输入,代码速度提高了 5.3 倍。在更大的输入数据帧上,加速应该明显更好。最后,请不要这样df.groupby([\'category_a\', \'category_b\'])
实际上已经预先计算过了,所以我什至不确定我们是否应该将其包含在基准测试中;)。
归档时间: |
|
查看次数: |
681 次 |
最近记录: |