python Pandas 中分块文件上数百万个键的 grouby 问题

swe*_*eet 3 python csv bigdata pandas

我有一个非常大的CSV文件(几十牡蛎)含有网络日志有以下的列:user_idtime_stampcategory_clicked。我必须建立一个评分器来确定用户喜欢和不喜欢的类别。请注意,我有超过 1000 万用户。

我第一次把它切成块,并把它们存储在一个HDFStore名为input.h5然后我用groupbyuser_id以下杰夫的方式

这是我的数据:大约 2 亿行,1000 万个唯一 user_id。

user id | timestamp | category_clicked
20140512081646222000004-927168801|20140722|7
20140512081714121000004-383009763|20140727|4
201405011348508050000041009490586|20140728|1
20140512081646222000004-927168801|20140724|1
20140501135024818000004-1623130763|20140728|3
Run Code Online (Sandbox Code Playgroud)

这是我的 pandas.show_version():

INSTALLED VERSIONS
------------------
commit: None
python: 2.7.6.final.0
python-bits: 64
OS: Windows
OS-release: 8
machine: AMD64
processor: AMD64 Family 21 Model 2 Stepping 0, AuthenticAMD
byteorder: little
LC_ALL: None
LANG: fr_FR

pandas: 0.13.1
Cython: 0.20.1
numpy: 1.8.1
scipy: 0.13.3
statsmodels: 0.5.0
IPython: 2.0.0
sphinx: 1.2.2
patsy: 0.2.1
scikits.timeseries: None
dateutil: 2.2
pytz: 2013.9
bottleneck: None
tables: 3.1.1
numexpr: 2.3.1
matplotlib: 1.3.1
openpyxl: None
xlrd: 0.9.3
xlwt: 0.7.5
xlsxwriter: None
sqlalchemy: 0.9.4
lxml: None
bs4: None
html5lib: None
bq: None
apiclient: None
Run Code Online (Sandbox Code Playgroud)

这是我想要的输出:

对于每个 user_id,一个[0.1,0.45,0.89,1.45,5.12,0.,0.,0.45,0.12,2.36,7.8]表示用户对每个类别的分数和一个全局分数的列表。我不能告诉你更多关于分数的信息,但它需要计算所有时间戳和 category_clicked 。你不能在以后总结或诸如此类的事情。

这是我的代码:

clean_input_reader = read_csv(work_path + '/input/input.csv', chunksize=500000)
with get_store(work_path+'/input/input.h5') as store:
    for chunk in clean_input_reader:
        store.append('clean_input', chunk,
                     data_columns=['user_id','timestamp','category_clicked'],
                     min_itemsize=15)

    groups = store.select_column('clean_input','user_id').unique()
    for user in groups:
        group_user = store.select('clean_input',where=['user_id==%s' %user])
        <<<<TREATMENT returns a list user_cat_score>>>>
        store.append(user, Series(user_cat_score))
Run Code Online (Sandbox Code Playgroud)

我的问题如下:在我看来,这条线的 group_user=store.select('clean_input',where=['user_id==%s' %user])时间复杂度太高了,因为我真的有很多组,而且我确信store.select如果我应用它 1000 万的例程中有很多冗余排序次。

为了给你一个估计,我用这种技术处理 1000 个键需要250 秒,而不是在没有分块的情况下读取全内存 CSV 文件的通常情况下只需要1 秒groupbyread_csv

**********更新***********

应用 Jeff 的散列方法后,我可以在 1 秒内处理 1000 个键(与完全内存方法的时间相同),并且绝对减少了 RAM 使用量。我以前没有的唯一时间损失当然是我用于分块、保存 100 个散列组以及从商店中的散列组中获取真实组所花费的时间。但是这个操作不会超过几分钟。

Jef*_*eff 5

这是任意缩放此问题的解决方案。这实际上是这里问题的高密度版本

定义一个函数以将特定组值散列到较少数量的组中。我会这样设计,将您的数据集划分为内存中可管理的部分。

def sub_group_hash(x):
    # x is a dataframe with the 'user id' field given above
    # return the last 2 characters of the input
    # if these are number like, then you will be sub-grouping into 100 sub-groups
    return x['user id'].str[-2:]
Run Code Online (Sandbox Code Playgroud)

使用上面提供的数据,这会在输入数据上创建一个分组框架,如下所示:

In [199]: [ (grp, grouped) for grp, grouped in df.groupby(sub_group_hash) ][0][1]
Out[199]: 
                             user id  timestamp  category
0  20140512081646222000004-927168801   20140722         7
3  20140512081646222000004-927168801   20140724         1
Run Code Online (Sandbox Code Playgroud)

grp作为组的名称,并grouped作为结果帧

# read in the input in a chunked way
clean_input_reader = read_csv('input.csv', chunksize=500000)
with get_store('output.h5') as store:
    for chunk in clean_input_reader:

        # create a grouper for each chunk using the sub_group_hash
        g = chunk.groupby(sub_group_hash)

        # append each of the subgroups to a separate group in the resulting hdf file
        # this will be a loop around the sub_groups (100 max in this case)
        for grp, grouped in g:

            store.append('group_%s' % grp, grouped,
                         data_columns=['user_id','timestamp','category_clicked'],
                         min_itemsize=15)
Run Code Online (Sandbox Code Playgroud)

现在您有一个包含 100 个子组的 hdf 文件(如果没有表示所有组,则可能更少),每个子组都包含执行操作所需的所有数据。

with get_store('output.h5') as store:

    # all of the groups are now the keys of the store
    for grp in store.keys():

        # this is a complete group that will fit in memory
        grouped = store.select(grp)

        # perform the operation on grouped and write the new output
        grouped.groupby(......).apply(your_cool_function)
Run Code Online (Sandbox Code Playgroud)

因此,在这种情况下,这会将问题减少 100 倍。如果这还不够,那么只需增加 sub_group_hash 以创建更多组。

你应该争取更小的数字,因为 HDF5 工作得更好(例如,不要制作 10M 的 sub_groups 违背目的,100、1000,甚至 10k 都可以)。但我认为 100 应该对你有用,除非你有一个非常疯狂的群体密度(例如,你在一个群体中有大量的人,而在其他群体中却很少)。

请注意,这个问题很容易扩展;如果需要,您可以将 sub_groups 存储在单独的文件中,和/或在必要时单独(并行)处理它们。

这应该使您的解决时间约为O(number_of_sub_groups)