如何有效地迭代Pandas数据帧的连续块

And*_*egg 42 python parallel-processing ipython pandas

我有一个大型数据帧(几百万行).

我希望能够对它进行groupby操作,但只需按任意连续(最好是相等大小)的行子集进行分组,而不是使用各行的任何特定属性来决定它们去哪个组.

用例:我想通过IPython中的并行映射将函数应用于每一行.哪个行转到哪个后端引擎并不重要,因为该函数一次基于一行计算结果.(从概念上讲,至少;实际上它是矢量化的.)

我想出了这样的事情:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)

# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]

# Process chunks in parallel
results = dview.map_sync(my_function, groups)
Run Code Online (Sandbox Code Playgroud)

但这似乎很啰嗦,并不能保证大小相等.特别是如果索引是稀疏的或非整数的或其他什么.

有什么更好的方法吗?

谢谢!

Ive*_*lin 48

使用numpy有这个内置:np.array_split()

import numpy as np
import pandas as pd

data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
  assert len(chunk) == len(data) / 5
Run Code Online (Sandbox Code Playgroud)

  • 这是最优雅的方法.只是一个简单的内置函数调用,应该是接受的答案. (4认同)
  • 当数据帧的长度不能被块的数量整除时,该断言就不会成立,但否则它将表现出预期的效果-最后几个数据帧都比第一个数据帧短一排。 (3认同)
  • 注意:问题是关于任意数量的块,而 np.array_split 是关于特定数量的块,与批次/块大小的大小无关。请参阅 np.array_split 文档 - https://numpy.org/doc/stable/reference/ generated/numpy.array_split.html (3认同)
  • 这比替代方案慢大约 5 到 10 倍,例如按照建议使用 groupby,但在“np.arange”而不是索引上。 (2认同)

Rya*_*yan 41

我不确定这是否正是你想要的,但我在另一个SO线程上发现这些分组函数对于执行多处理器池非常有用.

这是该线程的一个简短示例,它可能会执行您想要的操作:

import numpy as np
import pandas as pds

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

for i in chunker(df,5):
    print i
Run Code Online (Sandbox Code Playgroud)

这给你这样的东西:

          a         b         c         d
0  0.860574  0.059326  0.339192  0.786399
1  0.029196  0.395613  0.524240  0.380265
2  0.235759  0.164282  0.350042  0.877004
3  0.545394  0.881960  0.994079  0.721279
4  0.584504  0.648308  0.655147  0.511390
          a         b         c         d
5  0.276160  0.982803  0.451825  0.845363
6  0.728453  0.246870  0.515770  0.343479
7  0.971947  0.278430  0.006910  0.888512
8  0.044888  0.875791  0.842361  0.890675
9  0.200563  0.246080  0.333202  0.574488
           a         b         c         d
10  0.971125  0.106790  0.274001  0.960579
11  0.722224  0.575325  0.465267  0.258976
12  0.574039  0.258625  0.469209  0.886768
13  0.915423  0.713076  0.073338  0.622967
Run Code Online (Sandbox Code Playgroud)

我希望有所帮助.

编辑

在这种情况下,我以(近似)这种方式使用此函数和处理器池:

from multiprocessing import Pool

nprocs = 4

pool = Pool(nprocs)

for chunk in chunker(df, nprocs):
    data = pool.map(myfunction, chunk)
    data.domorestuff()
Run Code Online (Sandbox Code Playgroud)

我认为这应该与使用IPython分布式机器非常相似,但我还没有尝试过.


DSM*_*DSM 31

在实践中,你不能保证大小相等的块:毕竟,行数可能是素数,在这种情况下,你的唯一分块选项是大小为1或大块的块.我倾向于传递数组groupby.从...开始:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
    0         1         2         3         4
0   0  0.746300  0.346277  0.220362  0.172680
0   1  0.657324  0.687169  0.384196  0.214118
0   2  0.016062  0.858784  0.236364  0.963389
[...]
0  13  0.510273  0.051608  0.230402  0.756921
0  14  0.950544  0.576539  0.642602  0.907850

[15 rows x 5 columns]
Run Code Online (Sandbox Code Playgroud)

我故意通过将索引设置为0来使索引无法提供信息,我们只是决定我们的大小(这里是10)并按它对整数除以一个数组:

>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
...     print(k,g)
...     
0    0         1         2         3         4
0  0  0.746300  0.346277  0.220362  0.172680
0  1  0.657324  0.687169  0.384196  0.214118
0  2  0.016062  0.858784  0.236364  0.963389
[...]
0  8  0.241049  0.246149  0.241935  0.563428
0  9  0.493819  0.918858  0.193236  0.266257

[10 rows x 5 columns]
1     0         1         2         3         4
0  10  0.037693  0.370789  0.369117  0.401041
0  11  0.721843  0.862295  0.671733  0.605006
[...]
0  14  0.950544  0.576539  0.642602  0.907850

[5 rows x 5 columns]
Run Code Online (Sandbox Code Playgroud)

虽然您始终可以使用.iloc[a:b]忽略索引值并按位置访问数据,但基于切片DataFrame的方法可能会在索引与其不兼容时失败.

  • 这就是我的想法!技术上讲“df.groupby(np.arange(len(df)) // (len(df) / 10))”以获得固定数量的组(每个核心 1 个)而不是固定大小。出于某种原因,我没有想到分组键实际上根本不需要与索引相关...... (3认同)
  • 值得一提的是,为了提高效率,最好使用“迭代器”(https://pandas.pydata.org/pandas-docs/stable/genic/pandas.read_csv.html)和“大块大小”读取原始文件。 ”,以便read_csv函数进行读取,并且每个片段都可以传递给@Ryan描述的单独进程 (2认同)

And*_*hei 14

用于迭代 pandas 数据框和系列的块生成器函数

下面介绍了 chunk 函数的生成器版本。此外,此版本适用于 pd.DataFrame 或 pd.Series 的自定义索引(例如浮点类型索引)

    import numpy as np
    import pandas as pd

    df_sz = 14

    df = pd.DataFrame(np.random.rand(df_sz,4), 
                      index=np.linspace(0., 10., num=df_sz),
                      columns=['a', 'b', 'c', 'd']
                     )

    def chunker(seq, size):
        for pos in range(0, len(seq), size):
            yield seq.iloc[pos:pos + size] 

    chunk_size = 6
    for i in chunker(df, chunk_size):
        print(i)

   chnk = chunker(df, chunk_size)
   print('\n', chnk)
   print(next(chnk))
   print(next(chnk))
   print(next(chnk))
Run Code Online (Sandbox Code Playgroud)

输出是

                 A B C D
0.000000 0.560627 0.665897 0.683055 0.611884
0.769231 0.241871 0.357080 0.841945 0.340778
1.538462 0.065009 0.234621 0.250644 0.552410
2.307692 0.431394 0.235463 0.755084 0.114852
3.076923 0.173748 0.189739 0.148856 0.031171
3.846154 0.772352 0.697762 0.557806 0.254476
                 A B C D
4.615385 0.901200 0.977844 0.250316 0.957408
5.384615 0.400939 0.520841 0.863015 0.177043
6.153846 0.356927 0.344220 0.863067 0.400573
6.923077 0.375417 0.156420 0.897889 0.810083
7.692308 0.666371 0.152800 0.482446 0.955556
8.461538 0.242711 0.421591 0.005223 0.200596
                  A B C D
9.230769 0.735748 0.402639 0.527825 0.595952
10.000000 0.420209 0.365231 0.966829 0.514409

- 生成器对象分块器位于 0x7f503c9d0ba0

第一个“下一个()”:
                 A B C D
0.000000 0.560627 0.665897 0.683055 0.611884
0.769231 0.241871 0.357080 0.841945 0.340778
1.538462 0.065009 0.234621 0.250644 0.552410
2.307692 0.431394 0.235463 0.755084 0.114852
3.076923 0.173748 0.189739 0.148856 0.031171
3.846154 0.772352 0.697762 0.557806 0.254476

第二个“下一个()”:
                 A B C D
4.615385 0.901200 0.977844 0.250316 0.957408
5.384615 0.400939 0.520841 0.863015 0.177043
6.153846 0.356927 0.344220 0.863067 0.400573
6.923077 0.375417 0.156420 0.897889 0.810083
7.692308 0.666371 0.152800 0.482446 0.955556
8.461538 0.242711 0.421591 0.005223 0.200596

第三个“下一个()”:
                  A B C D
9.230769 0.735748 0.402639 0.527825 0.595952
10.000000 0.420209 0.365231 0.966829 0.514409


Mil*_*les 12

良好环境的标志是很多选择,所以我将从Anaconda Blaze中添加这个,真正使用Odo

import blaze as bz
import pandas as pd

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
    # Do stuff with chunked dataframe
Run Code Online (Sandbox Code Playgroud)

  • 不幸的是,Odo 似乎不再被维护。在撰写本文时,最后一次提交是在 11 个月前,并且贡献图已逐渐减少到零。 (2认同)

wll*_*bll 5

import pandas as pd

def batch(iterable, batch_number=10):
    """
    split an iterable into mini batch with batch length of batch_number
    supports batch of a pandas dataframe
    usage:
        for i in batch([1,2,3,4,5], batch_number=2):
            print(i)
        
        for idx, mini_data in enumerate(batch(df, batch_number=10)):
            print(idx)
            print(mini_data)
    """
    l = len(iterable)

    for idx in range(0, l, batch_number):
        if isinstance(iterable, pd.DataFrame):
            # dataframe can't split index label, should iter according index
            yield iterable.iloc[idx:min(idx+batch_number, l)]
        else:
            yield iterable[idx:min(idx+batch_number, l)]
Run Code Online (Sandbox Code Playgroud)