Dask 表演:工作流程疑虑

rpa*_*nai 4 dask dask-distributed

我对如何从 dask 中获得最佳效果感到困惑。

问题 我有一个包含多个时间序列的数据帧(每个都有自己的时间序列key),我需要my_fun在每个时间序列上运行一个函数。使用 Pandas 解决它的一种方法涉及 df = list(df.groupby("key"))然后应用my_fun 多处理。尽管 RAM 使用量很大,但性能在我的机器上非常好,而在谷歌云计算上却很糟糕。

在 Dask 我目前的工作流程是:

import dask.dataframe as dd
from dask.multiprocessing import get
Run Code Online (Sandbox Code Playgroud)
  1. 从 S3 读取数据。14 个文件 -> 14 个分区
  2. `df.groupby("key").apply(my_fun).to_frame.compute(get=get)

因为我没有设置索引df.known_divisionsFalse

结果图是 在此处输入图片说明 我不明白我所看到的是否是瓶颈。

问题:

  1. df.npartitions作为倍数更好ncpu还是无关紧要?
  2. 这个似乎是更好地设置索引的关键。我的猜测是我可以做类似的事情

    df["key2"] = df["key"] df = df.set_index("key2")

但是,同样,我不知道这是否是最好的方法。

mdu*_*ant 5

对于像 Dask 中的“什么需要时间”这样的问题,通常建议您使用“分布式”调度程序而不是多处理 - 您可以运行任意数量的进程/线程,但您可以通过诊断获得更多信息仪表盘。

对于您的特定问题,如果您对一个没有很好地在分区之间拆分的列进行分组并应用除简单聚合之外的任何内容,您将不可避免地需要洗牌。设置索引会作为显式步骤为您执行此洗牌,或者您会在任务图中得到明显的隐式洗牌。这是一个多对多的操作,每个聚合任务都需要来自每个原始分区的输入,因此是瓶颈。没有办法解决这个问题。

至于分区数量,是的,您可以有次优条件,例如 8 个内核上的 9 个分区(您将计算 8 个任务,然后可能会在一个内核上阻塞最终任务,而其他内核则处于空闲状态);但总的来说,只要您不使用非常少量的分区,您就可以依靠 dask 来做出合理的调度决策。在许多情况下,这无关紧要。