dask 的 set_index 的进度报告

kad*_*ach 5 dask dask-distributed

我正在尝试在整个脚本周围添加一个进度指示器。但是,set_index(..., compute=False)仍然在调度程序上运行任务,可以在 Web 界面中观察到。

如何报告该set_index步骤的进度?

import dask.dataframe as dd
from dask.distributed import Client, progress

if __name__ == '__main__':

  with Client() as client:

    df = dd.read_csv('big.csv')

    # I can see on the web interface that something is happening.
    # This blocks 20-30s on this particular CSV.
    df = df.set_index('id', compute=False)

    # Progress reporting works from here
    out = client.compute(
      df
    )
    progress(out)

    # out.result()
    # ...
Run Code Online (Sandbox Code Playgroud)

Gab*_*eph 2

不幸的是,没有简单的方法可以做到这一点。set_index急切地在内部调用compute,因为它需要扫描您的数据来决定新分区的边界应该是什么。无法控制此compute调用并让它使用进度条。如果您认为这值得向用户公开,您可以提出一个问题来进一步讨论。

或者(困难的方法),您可以自己计算 DataFrame 的划分(这样做时显示进度条),然后将它们传递到set_index,以防止set_index进行计算。

这里的缺点是你正在复制 dask 已有的代码,并且 dask 的方法有一些关于重新分区、检测预排序值和使用快速路径等的额外 逻辑。但在简单的情况下,这样的东西可能会起作用(注意这是不使用公共/记录的 API,因此它可能会在没有警告的情况下发生更改):

import dask
from dask.distributed import Client, progress

if __name__ == "__main__":

    with Client() as client:

        df = dask.datasets.timeseries()

        # Calculate divisions manually to show a progress bar
        from dask.dataframe.partitionquantiles import partition_quantiles

        divisions_ddf = partition_quantiles(df["id"], df.npartitions)
        future = client.compute(divisions_ddf)
        print("Computing divisions")
        progress(future)
        divisions = future.result().tolist()

        df = df.set_index("id", divisions=divisions, compute=False)

        out = client.compute(df)
        progress(out)

        # out.result()
        # ...
Run Code Online (Sandbox Code Playgroud)