我正在使用点击流处理数据框,并且我在点击流中为每个用户提取要在机器学习项目中使用的功能.
数据框是这样的:
data = pd.DataFrame({'id':['A01','B01','A01','C01','A01','B01','A01'],
'event':['search','search','buy','home','cancel','home','search'],
'date':['2018-01-01','2018-01-01','2018-01-02','2018-01-03','2018-01-04','2018-01-04','2018-01-06'],
'product':['tablet','dvd','tablet','tablet','tablet','book','book'],
'price': [103,2,203,103,203,21,21]})
data['date'] = pd.to_datetime(data['date'])
Run Code Online (Sandbox Code Playgroud)
因为我必须为每个用户创建功能,所以我使用groupby/apply使用自定义函数,例如:
featurized = data.groupby('id').apply(featurize)
Run Code Online (Sandbox Code Playgroud)
创建用户功能将占用数据帧的一大块并创建许多(数百个)功能.整个过程太慢了,所以我正在寻找一个更有效地做到这一点的建议.
用于创建功能的函数示例:
def featurize(group):
features = dict()
# Userid
features['id'] = group['id'].max()
# Feature 1: Number of search events
features['number_of_search_events'] = (group['event']=='search').sum()
# Feature 2: Number of tablets
features['number_of_tablets'] = (group['product']=='tablet').sum()
# Feature 3: Total time
features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1,'D')
# Feature 4: Total number of events
features['events'] = len(group)
# Histogram of products examined
product_counts = group['product'].value_counts()
# Feature 5 max events for a product
features['max_product_events'] = product_counts.max()
# Feature 6 min events for a product
features['min_product_events'] = product_counts.min()
# Feature 7 avg events for a product
features['mean_product_events'] = product_counts.mean()
# Feature 8 std events for a product
features['std_product_events'] = product_counts.std()
# Feature 9 total price for tablet products
features['tablet_price_sum'] = group.loc[group['product']=='tablet','price'].sum()
# Feature 10 max price for tablet products
features['tablet_price_max'] = group.loc[group['product']=='tablet','price'].max()
# Feature 11 min price for tablet products
features['tablet_price_min'] = group.loc[group['product']=='tablet','price'].min()
# Feature 12 mean price for tablet products
features['tablet_price_mean'] = group.loc[group['product']=='tablet','price'].mean()
# Feature 13 std price for tablet products
features['tablet_price_std'] = group.loc[group['product']=='tablet','price'].std()
return pd.Series(features)
Run Code Online (Sandbox Code Playgroud)
一个潜在的问题是每个功能都可能扫描整个块,所以如果我有100个功能,我会扫描块100次,而不是只扫描一次.
例如,特征可以是用户具有的"平板电脑"事件的数量,其他可以是"家庭"事件的数量,其他可以是"搜索"事件之间的平均时间差,然后是"搜索"事件之间的平均时间差.对于"平板电脑"等等,每个功能都可以编码为一个获取块(df)并创建功能的功能,但是当我们有100个功能时,每个功能都会在单个线性扫描就足够时扫描整个块.问题是,如果我在块中的每个记录上执行循环手动并编写循环中的所有功能,代码将变得丑陋.
问题:
如果我必须处理数百帧数百次,是否有办法在单次扫描中对其进行抽象,从而创建所需的所有功能?
我目前正在使用的groupby/apply方法是否有速度提升?
免责声明:以下答案不能正确回答上述问题。只是为了投入工作而将其留在这里。也许在某个时候它会有一些用处。
group.loc[group['product']=='tablet','price'])HDFStore)至于 (1),根据上面的代码,我可以实现高达 43% 的加速(i7-7700HQ CPU,16GB RAM)。
时间安排
using joblib: 68.86841534099949s
using multiprocessing: 71.53540843299925s
single-threaded: 119.05010353899888s
Run Code Online (Sandbox Code Playgroud)
代码
import pandas as pd
import numpy as np
import time
import timeit
import os
import joblib
import multiprocessing
import pandas as pd
import numpy as np
import timeit
import joblib
import multiprocessing
def make_data():
# just some test data ...
n_users = 100
events = ['search', 'buy', 'home', 'cancel']
products = ['tablet', 'dvd', 'book']
max_price = 1000
n_duplicates = 1000
n_rows = 40000
df = pd.DataFrame({
'id': list(map(str, np.random.randint(0, n_users, n_rows))),
'event': list(map(events.__getitem__, np.random.randint(0, len(events), n_rows))),
'date': list(map(pd.to_datetime, np.random.randint(0, 100000, n_rows))),
'product': list(map(products.__getitem__, np.random.randint(0, len(products), n_rows))),
'price': np.random.random(n_rows) * max_price
})
df = pd.concat([df for _ in range(n_duplicates)])
df.to_pickle('big_df.pkl')
return df
def data():
return pd.read_pickle('big_df.pkl')
def featurize(group):
features = dict()
# Feature 1: Number of search events
features['number_of_search_events'] = (group['event'] == 'search').sum()
# Feature 2: Number of tablets
features['number_of_tablets'] = (group['product'] == 'tablet').sum()
# Feature 3: Total time
features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1, 'D')
# Feature 4: Total number of events
features['events'] = len(group)
# Histogram of products examined
product_counts = group['product'].value_counts()
# Feature 5 max events for a product
features['max_product_events'] = product_counts.max()
# Feature 6 min events for a product
features['min_product_events'] = product_counts.min()
# Feature 7 avg events for a product
features['mean_product_events'] = product_counts.mean()
# Feature 8 std events for a product
features['std_product_events'] = product_counts.std()
# Feature 9 total price for tablet products
features['tablet_price_sum'] = group.loc[group['product'] == 'tablet', 'price'].sum()
# Feature 10 max price for tablet products
features['tablet_price_max'] = group.loc[group['product'] == 'tablet', 'price'].max()
# Feature 11 min price for tablet products
features['tablet_price_min'] = group.loc[group['product'] == 'tablet', 'price'].min()
# Feature 12 mean price for tablet products
features['tablet_price_mean'] = group.loc[group['product'] == 'tablet', 'price'].mean()
# Feature 13 std price for tablet products
features['tablet_price_std'] = group.loc[group['product'] == 'tablet', 'price'].std()
return pd.DataFrame.from_records(features, index=[group['id'].max()])
# /sf/ask/1833143161/
def apply_parallel_job(dfGrouped, func):
retLst = joblib.Parallel(n_jobs=multiprocessing.cpu_count())(
joblib.delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
def apply_parallel_pool(dfGrouped, func):
with multiprocessing.Pool(multiprocessing.cpu_count()) as p:
ret_list = list(p.map(func, [group for name, group in dfGrouped]))
return pd.concat(ret_list)
featurized_job = lambda df: apply_parallel_job(df.groupby('id'), featurize)
featurized_pol = lambda df: apply_parallel_pool(df.groupby('id'), featurize)
featurized_sng = lambda df: df.groupby('id').apply(featurize)
make_data()
print(timeit.timeit("featurized_job(data())", "from __main__ import featurized_job, data", number=3))
print(timeit.timeit("featurized_sng(data())", "from __main__ import featurized_sng, data", number=3))
print(timeit.timeit("featurized_pol(data())", "from __main__ import featurized_pol, data", number=3))
Run Code Online (Sandbox Code Playgroud)
对于(7),考虑以下重构:
时间安排
original: 112.0091859719978s
re-used prices: 83.85681765000118s
Run Code Online (Sandbox Code Playgroud)
代码
# [...]
prices_ = group.loc[group['product'] == 'tablet', 'price']
features['tablet_price_sum'] = prices_.sum()
# Feature 10 max price for tablet products
features['tablet_price_max'] = prices_.max()
# Feature 11 min price for tablet products
features['tablet_price_min'] = prices_.min()
# Feature 12 mean price for tablet products
features['tablet_price_mean'] = prices_.mean()
# Feature 13 std price for tablet products
features['tablet_price_std'] = prices_.std()
# [...]
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
266 次 |
| 最近记录: |