Dask:创建严格递增索引

Ian*_*Ian 7 python python-3.x dask

正如有据可查的那样,Dask 在reset_index调用时会在每个分区的基础上创建一个严格递增的索引,从而导致整个集合上出现重复的索引。在 Dask 中创建严格递增索引(不必是连续的)在整个集合上的最佳方法(例如计算最快)是什么?我希望map_partitions能传递分区号,但我认为不会。谢谢。

编辑

谢谢@MRocklin,我已经做到了这一点,但我需要一些关于如何将我的系列与原始数据框重新组合的帮助。

def create_increasing_index(ddf:dd.DataFrame):
    mps = int(len(ddf) / ddf.npartitions + 1000)
    values = ddf.index.values

    def do(x, max_partition_size, block_id=None):
        length = len(x)
        if length == 0:
            raise ValueError("Does not work with empty partitions. Consider using dask.repartition.")

        start = block_id[0] * max_partition_size
        return da.arange(start, start+length, chunks=1)

    series = values.map_blocks(do, max_partition_size=mps, dtype=np.int64)
    ddf2 = dd.concat([ddf, dd.from_array(series)], axis=1)
    return ddf2
Run Code Online (Sandbox Code Playgroud)

我收到错误“ValueError:无法将 DataFrame 与指定 axis=1 的未知除法连接”。有没有比使用 dd.concat 更好的方法?谢谢。

再次编辑

实际上,就我的目的而言(以及我测试的数据量 - 只有几 GB),cumsum 已经足够快了。当这变得太慢时我会重新访问!

MRo*_*lin 4

实现此目的的一种相当慢的方法是创建一个新列,然后使用cumsum

ddf['x'] = 1
ddf['x'] = ddf.x.cumsum()
ddf = ddf.set_index('x', sorted=True)
Run Code Online (Sandbox Code Playgroud)

这既不是很慢也不是免费的。

鉴于您的问题是如何表达的,我怀疑您只是想为每个分区创建一个范围,该范围由一个非常大的值分隔,您知道该值大于最大行数。你是对的,map_partitions没有提供分区号。您可以改为执行以下两种解决方案之一。

  1. 转换为 dask.array (使用.values),使用该map_blocks方法,该方法提供块索引,然后使用 转换回系列dd.from_array
  2. 转换为 dask.delayed 对象列表,自己创建延迟系列,然后使用以下命令转换回 dask 系列dd.from_delayed

http://dask.pydata.org/en/latest/delayed-collections.html