Pandas/SQL 共现计数

Mus*_*ger 8 python dataframe pandas

假设我有以下表格/数据框:

d = {'store': ['s1', 's1', 's2', 's2',], 'product': ['a', 'c', 'a', 'c']}
    df = pd.DataFrame(data=d)


print(df)
    store  product
0     s1      a                 
1     s1      c                     
3     s2      a                  
4     s2      c                
Run Code Online (Sandbox Code Playgroud)

我想找出每对产品在商店中共同出现的次数。

由于数据非常大(5M 行和大约 50K 个单独的产品和 20K 个单独的商店)并且有很多潜在的共现对,我只想获得每个产品的前 n 个(例如:10)共现和同时发生的次数。示例结果如下:

    product_1  product_2     cooccurrence_count
0      a           c                  2 
1      c           a                  2
Run Code Online (Sandbox Code Playgroud)

用 SQL 代替 Pandas 的有效且高效的解决方案也是可以接受的

Dim*_*try 1

只是因为这个问题写得很好,而且看起来像是一个很好的谜题,所以这里有一些魔力。

您可能需要存储大量数据,因此您需要尽可能压缩帧并多次遍历基础。如果数据库不包含原始对象,请将它们转换为整数,如果进行多重处理,数据帧将被复制到子进程中,因此保持其内容较小会有所帮助。

运行时间取决于数据帧的长度,还取决于唯一商店、唯一产品的数量以及要计数的成对块的大小。将工作分散到许多子流程可以加快速度,但所有累积的功能都会产生恒定的成本。例如,pandas 自己的方法在单个一万行数据帧上比在十几万行数据帧上运行得更快。当您对不可预测大小的子数据帧运行嵌套调用时,事情会变得复杂。您可能需要进行一些实验才能找到具有最佳速度\内存使用率的块大小。

首先用较小的数字测试运行时间。包括较少的商店和产品。话虽这么说,这不是一个快速的任务。在高端机器上,大约十分钟即可完成。

import pandas as pd, numpy as np
df = pd.DataFrame({
  'store':np.random.randint(0,int(2e4),int(5e6)),
  'product':np.random.randint(0,int(5e4),int(5e6))
  }).sort_values('store')

products = df['product'].unique()
N, chunksize, Ntop = len(products), int(1e4), 200
dtype = np.min_scalar_type(max(products.max(),N))
df = df.astype(dtype)

def store_cats(df):
    df = df.astype('category')
    cats = [df[x].cat.categories for x in df.columns]
    for col in df.columns:
        df[col] = df[col].cat.codes
    return df, cats    
def restore_cats(summary,cats):
    for col in ['product_x','product_y']:
        summary[col] = pandas.Categorical.from_codes(summary[col], cats)

def subsets(n = chunksize):
    n = int(n)
    res = [frozenset(products[i:i+n]) for i in range(0,N,n)]
    info = 'In total there will be {:.1E} pairs, per pass {:.1E} will be checked, thats up to around {} mb per pass, {} passes'
    print(info.format((N**2),(n*N),(n*N*3*8/1e6),len(res)))
    return res

def count(df,subset):
    res = df.merge(df,on = 'store')\
        .query('(product_x < product_y) and product_x in @subset')\
        .groupby(['product_x','product_y'])\
        .count()\
        .astype(dtype)\
        .reset_index()
    return res 
def one_pass(gr,subset):
    per_group = gr.apply(count,subset)
    total_counts = per_group.sort_values(['product_x','product_y'])\
        .groupby(['product_x','product_y'])\
        .agg('sum')\
        .sort_values('store',ascending=False)[:Ntop]\
        .copy().reset_index()
    return total_counts
def merge_passes(dfs):
    res = pd.concat(dfs,ignore_index=True)
    res = res.append(res.rename(columns={'product_x':'product_y','product_y':'product_x'}),ignore_index=True)
    res = res.sort_values('store',ascending=False)[:Ntop]
    return res

from concurrent.futures import as_completed, ProcessPoolExecutor as Pool

gr = df.groupby('store',as_index = False)
def worker(subset):
    return one_pass(gr,subset)
def run_progress(max_workers=2,chunksize=chunksize):
    from tqdm.auto import tqdm 
    with Pool(max_workers = max_workers) as p:
        futures = [p.submit(worker,subset) for subset in subsets(chunksize)]
        summaries = [x.result() for x in tqdm(as_completed(futures),total=len(futures))]
        return merge_passes(summaries)
Run Code Online (Sandbox Code Playgroud)