处理点击流以在Pandas中创建功能的最佳方式

use*_*284 7 python pandas

我正在使用点击流处理数据框,并且我在点击流中为每个用户提取要在机器学习项目中使用的功能.

数据框是这样的:

data = pd.DataFrame({'id':['A01','B01','A01','C01','A01','B01','A01'],
                     'event':['search','search','buy','home','cancel','home','search'],
                     'date':['2018-01-01','2018-01-01','2018-01-02','2018-01-03','2018-01-04','2018-01-04','2018-01-06'],
                     'product':['tablet','dvd','tablet','tablet','tablet','book','book'],
                     'price': [103,2,203,103,203,21,21]})
data['date'] = pd.to_datetime(data['date'])
Run Code Online (Sandbox Code Playgroud)

因为我必须为每个用户创建功能,所以我使用groupby/apply使用自定义函数,例如:

featurized = data.groupby('id').apply(featurize)
Run Code Online (Sandbox Code Playgroud)

创建用户功能将占用数据帧的一大块并创建许多(数百个)功能.整个过程太慢了,所以我正在寻找一个更有效地做到这一点的建议.

用于创建功能的函数示例:

def featurize(group):
    features = dict()

    # Userid
    features['id'] = group['id'].max()
    # Feature 1: Number of search events
    features['number_of_search_events'] = (group['event']=='search').sum()
    # Feature 2: Number of tablets
    features['number_of_tablets'] = (group['product']=='tablet').sum()
    # Feature 3: Total time
    features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1,'D')
    # Feature 4: Total number of events
    features['events'] = len(group)
    # Histogram of products examined
    product_counts = group['product'].value_counts()
    # Feature 5 max events for a product
    features['max_product_events'] = product_counts.max()
    # Feature 6 min events for a product
    features['min_product_events'] = product_counts.min()
    # Feature 7 avg events for a product
    features['mean_product_events'] = product_counts.mean()
    # Feature 8 std events for a product
    features['std_product_events'] = product_counts.std()
    # Feature 9 total price for tablet products
    features['tablet_price_sum'] = group.loc[group['product']=='tablet','price'].sum()
    # Feature 10 max price for tablet products
    features['tablet_price_max'] = group.loc[group['product']=='tablet','price'].max()
    # Feature 11 min price for tablet products
    features['tablet_price_min'] = group.loc[group['product']=='tablet','price'].min()
    # Feature 12 mean price for tablet products
    features['tablet_price_mean'] = group.loc[group['product']=='tablet','price'].mean()
    # Feature 13 std price for tablet products
    features['tablet_price_std'] = group.loc[group['product']=='tablet','price'].std()
    return pd.Series(features)
Run Code Online (Sandbox Code Playgroud)

一个潜在的问题是每个功能都可能扫描整个块,所以如果我有100个功能,我会扫描块100次,而不是只扫描一次.

例如,特征可以是用户具有的"平板电脑"事件的数量,其他可以是"家庭"事件的数量,其他可以是"搜索"事件之间的平均时间差,然后是"搜索"事件之间的平均时间差.对于"平板电脑"等等,每个功能都可以编码为一个获取块(df)并创建功能的功能,但是当我们有100个功能时,每个功能都会在单个线性扫描就足够时扫描整个块.问题是,如果我在块中的每个记录上执行循环手动并编写循环中的所有功能,代码将变得丑陋.

问题:

  1. 如果我必须处理数百帧数百次,是否有办法在单次扫描中对其进行抽象,从而创建所需的所有功能?

  2. 我目前正在使用的groupby/apply方法是否有速度提升?

Mic*_*off 3

免责声明:以下答案不能正确回答上述问题。只是为了投入工作而将其留在这里。也许在某个时候它会有一些用处。

  1. 重用数据框选择(例如group.loc[group['product']=='tablet','price']
  2. 并行性(例如,在 pandas groupby 之后并行应用;请参阅下面的代码)
  3. 如果多次运行计算,请使用缓存(例如HDFStore
  4. 避免字符串操作;使用可以在 numpy 中有效表示的本机类型
  5. 如果您确实需要字符串,请使用分类列(假设它们代表分类数据..)
  6. 如果帧确实很大,请考虑使用块(例如使用 pandas 的“大数据”工作流程
  7. 使用cython进行进一步(可能是巨大的)增强

至于 (1),根据上面的代码,我可以实现高达 43% 的加速(i7-7700HQ CPU,16GB RAM)。

时间安排

using joblib: 68.86841534099949s
using multiprocessing: 71.53540843299925s
single-threaded: 119.05010353899888s
Run Code Online (Sandbox Code Playgroud)

代码

import pandas as pd
import numpy as np
import time
import timeit
import os
import joblib
import multiprocessing


import pandas as pd
import numpy as np
import timeit
import joblib
import multiprocessing


def make_data():
    # just some test data ...
    n_users = 100
    events = ['search', 'buy', 'home', 'cancel']
    products = ['tablet', 'dvd', 'book']
    max_price = 1000

    n_duplicates = 1000
    n_rows = 40000

    df = pd.DataFrame({
        'id': list(map(str, np.random.randint(0, n_users, n_rows))),
        'event': list(map(events.__getitem__, np.random.randint(0, len(events), n_rows))),
        'date': list(map(pd.to_datetime, np.random.randint(0, 100000, n_rows))),
        'product': list(map(products.__getitem__, np.random.randint(0, len(products), n_rows))),
        'price': np.random.random(n_rows) * max_price
    })
    df = pd.concat([df for _ in range(n_duplicates)])
    df.to_pickle('big_df.pkl')
    return df


def data():
    return pd.read_pickle('big_df.pkl')


def featurize(group):
    features = dict()

    # Feature 1: Number of search events
    features['number_of_search_events'] = (group['event'] == 'search').sum()
    # Feature 2: Number of tablets
    features['number_of_tablets'] = (group['product'] == 'tablet').sum()
    # Feature 3: Total time
    features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1, 'D')
    # Feature 4: Total number of events
    features['events'] = len(group)
    # Histogram of products examined
    product_counts = group['product'].value_counts()
    # Feature 5 max events for a product
    features['max_product_events'] = product_counts.max()
    # Feature 6 min events for a product
    features['min_product_events'] = product_counts.min()
    # Feature 7 avg events for a product
    features['mean_product_events'] = product_counts.mean()
    # Feature 8 std events for a product
    features['std_product_events'] = product_counts.std()
    # Feature 9 total price for tablet products
    features['tablet_price_sum'] = group.loc[group['product'] == 'tablet', 'price'].sum()
    # Feature 10 max price for tablet products
    features['tablet_price_max'] = group.loc[group['product'] == 'tablet', 'price'].max()
    # Feature 11 min price for tablet products
    features['tablet_price_min'] = group.loc[group['product'] == 'tablet', 'price'].min()
    # Feature 12 mean price for tablet products
    features['tablet_price_mean'] = group.loc[group['product'] == 'tablet', 'price'].mean()
    # Feature 13 std price for tablet products
    features['tablet_price_std'] = group.loc[group['product'] == 'tablet', 'price'].std()
    return pd.DataFrame.from_records(features, index=[group['id'].max()])


# /sf/ask/1833143161/
def apply_parallel_job(dfGrouped, func):
    retLst = joblib.Parallel(n_jobs=multiprocessing.cpu_count())(
        joblib.delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)


def apply_parallel_pool(dfGrouped, func):
    with multiprocessing.Pool(multiprocessing.cpu_count()) as p:
        ret_list = list(p.map(func, [group for name, group in dfGrouped]))
    return pd.concat(ret_list)


featurized_job = lambda df: apply_parallel_job(df.groupby('id'), featurize)
featurized_pol = lambda df: apply_parallel_pool(df.groupby('id'), featurize)
featurized_sng = lambda df: df.groupby('id').apply(featurize)

make_data()
print(timeit.timeit("featurized_job(data())", "from __main__ import featurized_job, data", number=3))
print(timeit.timeit("featurized_sng(data())", "from __main__ import featurized_sng, data", number=3))
print(timeit.timeit("featurized_pol(data())", "from __main__ import featurized_pol, data", number=3))
Run Code Online (Sandbox Code Playgroud)

对于(7),考虑以下重构:

时间安排

original: 112.0091859719978s
re-used prices: 83.85681765000118s
Run Code Online (Sandbox Code Playgroud)

代码

# [...]
prices_ = group.loc[group['product'] == 'tablet', 'price']
features['tablet_price_sum'] = prices_.sum()
# Feature 10 max price for tablet products
features['tablet_price_max'] = prices_.max()
# Feature 11 min price for tablet products
features['tablet_price_min'] = prices_.min()
# Feature 12 mean price for tablet products
features['tablet_price_mean'] = prices_.mean()
# Feature 13 std price for tablet products
features['tablet_price_std'] = prices_.std()
# [...]
Run Code Online (Sandbox Code Playgroud)