blu*_*e10 7 python distributed dask
与在本地运行Dask/Distributed工作程序相比,我的计算运行速度要慢得多.我可以在没有任何I/O的情况下重现它,所以我可以排除它与传输数据有关.以下代码是一个最小的再现示例:
import time
import pandas as pd
import numpy as np
from dask.distributed import Client, LocalCluster
def gen_data(N=5000000):
""" Dummy data generator """
df = pd.DataFrame(index=range(N))
for c in range(10):
df[str(c)] = np.random.uniform(size=N)
df["id"] = np.random.choice(range(100), size=len(df))
return df
def do_something_on_df(df):
""" Dummy computation that contains inplace mutations """
for c in range(df.shape[1]):
df[str(c)] = np.random.uniform(size=df.shape[0])
return 42
def run_test():
""" Test computation """
df = gen_data()
for key, group_df in df.groupby("id"):
do_something_on_df(group_df)
class TimedContext(object):
def __enter__(self):
self.t1 = time.time()
def __exit__(self, exc_type, exc_val, exc_tb):
self.t2 = time.time()
print(self.t2 - self.t1)
if __name__ == "__main__":
client = Client("tcp://10.0.2.15:8786")
with TimedContext():
run_test()
with TimedContext():
client.submit(run_test).result()
Run Code Online (Sandbox Code Playgroud)
在本地运行测试计算大约需要10秒,但在Dask/Distributed中需要大约30秒.我还注意到Dask/Distributed worker输出了很多日志消息
distributed.core - WARNING - Event loop was unresponsive for 1.03s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.25s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.91s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.99s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.50s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 1.90s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - WARNING - Event loop was unresponsive for 2.23s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
...
Run Code Online (Sandbox Code Playgroud)
这是令人惊讶的,因为在这个例子中持有GIL的内容尚不清楚.
为什么会有这么大的性能差异?我能做些什么才能获得相同的性能?
免责声明:自我回答用于文档目的......
blu*_*e10 17
这种行为是熊猫非常令人惊讶的行为的结果.默认情况下,Pandas __setitem__处理程序执行检查以检测链式分配,从而导致着名的SettingWithCopyWarning.处理副本时,这些检查会向gc.collect 此处发出呼叫.因此,__setitem__过度使用的代码将导致过多的gc.collect调用.这可能对总体性能有显著影响,但问题是DASK /分布式工人中糟糕得多,因为垃圾回收处理相比,独立运行更Python数据结构.很可能隐藏的垃圾收集调用也是GIL持有警告的来源.
因此,解决方案是避免这些过度的gc.collect呼叫.有两种方法:
__setitem__在副本上使用:可以说是最好的解决方案,但需要了解副本的生成位置.在上面的例子中,这可以通过将函数调用更改为来实现do_something_on_df(group_df.copy()).pd.options.mode.chained_assignment = None在计算开始时也会禁用这些gc.collect调用.在这两种情况下,测试计算运行速度比以前快得多,在本地和Dask/Distributed下运行约3.5秒.这也消除了GIL控制警告.
注意:我已在GitHub上提交了此问题.
| 归档时间: |
|
| 查看次数: |
1340 次 |
| 最近记录: |