为什么Dask/Distributed工作中的计算速度要慢得多?

blu*_*e10 7 python distributed dask

与在本地运行Dask/Distributed工作程序相比,我的计算运行速度要慢得多.我可以在没有任何I/O的情况下重现它,所以我可以排除它与传输数据有关.以下代码是一个最小的再现示例:

import time
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster


def gen_data(N=5000000):
    """ Dummy data generator """
    df = pd.DataFrame(index=range(N))
    for c in range(10):
        df[str(c)] = np.random.uniform(size=N)
    df["id"] = np.random.choice(range(100), size=len(df))
    return df


def do_something_on_df(df):
    """ Dummy computation that contains inplace mutations """
    for c in range(df.shape[1]):
        df[str(c)] = np.random.uniform(size=df.shape[0])
    return 42


def run_test():
    """ Test computation """
    df = gen_data()
    for key, group_df in df.groupby("id"):
        do_something_on_df(group_df)


class TimedContext(object):
    def __enter__(self):
        self.t1 = time.time()

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.t2 = time.time()
        print(self.t2 - self.t1)

if __name__ == "__main__":
    client = Client("tcp://10.0.2.15:8786")

    with TimedContext():
        run_test()

    with TimedContext():
        client.submit(run_test).result()
Run Code Online (Sandbox Code Playgroud)

在本地运行测试计算大约需要10秒,但在Dask/Distributed中需要大约30秒.我还注意到Dask/Distributed worker输出了很多日志消息

distributed.core - WARNING - Event loop was unresponsive for 1.03s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.25s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.91s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.99s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.50s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.90s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 2.23s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
...
Run Code Online (Sandbox Code Playgroud)

这是令人惊讶的,因为在这个例子中持有GIL的内容尚不清楚.

为什么会有这么大的性能差异?我能做些什么才能获得相同的性能?

免责声明:自我回答用于文档目的......

blu*_*e10 17

这种行为是熊猫非常令人惊讶的行为的结果.默认情况下,Pandas __setitem__处理程序执行检查以检测链式分配,从而导致着名的SettingWithCopyWarning.处理副本时,这些检查会向gc.collect 此处发出呼叫.因此,__setitem__过度使用的代码将导致过多的gc.collect调用.这可能对总体性能有显著影响,但问题是DASK /分布式工人中糟糕得多,因为垃圾回收处理相比,独立运行更Python数据结构.很可能隐藏的垃圾收集调用也是GIL持有警告的来源.

因此,解决方案是避免这些过度的gc.collect呼叫.有两种方法:

  • 避免__setitem__在副本上使用:可以说是最好的解决方案,但需要了解副本的生成位置.在上面的例子中,这可以通过将函数调用更改为来实现do_something_on_df(group_df.copy()).
  • 禁用链式赋值检查:只需放置pd.options.mode.chained_assignment = None在计算开始时也会禁用这些gc.collect调用.

在这两种情况下,测试计算运行速度比以前快得多,在本地和Dask/Distributed下运行约3.5秒.这也消除了GIL控制警告.

注意:我已在GitHub上提交了此问题.