kad*_*ach 5 dask dask-distributed
我正在尝试在整个脚本周围添加一个进度指示器。但是,set_index(..., compute=False)仍然在调度程序上运行任务,可以在 Web 界面中观察到。
如何报告该set_index步骤的进度?
import dask.dataframe as dd
from dask.distributed import Client, progress
if __name__ == '__main__':
with Client() as client:
df = dd.read_csv('big.csv')
# I can see on the web interface that something is happening.
# This blocks 20-30s on this particular CSV.
df = df.set_index('id', compute=False)
# Progress reporting works from here
out = client.compute(
df
)
progress(out)
# out.result()
# ...
Run Code Online (Sandbox Code Playgroud)
不幸的是,没有简单的方法可以做到这一点。set_index急切地在内部调用compute,因为它需要扫描您的数据来决定新分区的边界应该是什么。无法控制此compute调用并让它使用进度条。如果您认为这值得向用户公开,您可以提出一个问题来进一步讨论。
或者(困难的方法),您可以自己计算 DataFrame 的划分(这样做时显示进度条),然后将它们传递到set_index,以防止set_index进行计算。
这里的缺点是你正在复制 dask 已有的代码,并且 dask 的方法有一些关于重新分区、检测预排序值和使用快速路径等的额外 逻辑。但在简单的情况下,这样的东西可能会起作用(注意这是不使用公共/记录的 API,因此它可能会在没有警告的情况下发生更改):
import dask
from dask.distributed import Client, progress
if __name__ == "__main__":
with Client() as client:
df = dask.datasets.timeseries()
# Calculate divisions manually to show a progress bar
from dask.dataframe.partitionquantiles import partition_quantiles
divisions_ddf = partition_quantiles(df["id"], df.npartitions)
future = client.compute(divisions_ddf)
print("Computing divisions")
progress(future)
divisions = future.result().tolist()
df = df.set_index("id", divisions=divisions, compute=False)
out = client.compute(df)
progress(out)
# out.result()
# ...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
213 次 |
| 最近记录: |