如何将 Pandas 数据帧传递给分布式工作者?

kei*_*fly 3 python distributed pandas dask

我试图将一个大熊猫数据帧作为函数参数传递给分布式 dask 的工作人员。我尝试过的(X 是我的数据框):

1 将数据直接传递给函数:

def test(X):
    return X
f=client.submit(test, X)
f.result()
Run Code Online (Sandbox Code Playgroud)

2 在初始化函数中保存数据帧。

def worker_init(r_X):
    global X
    X=r_X
client.run(worker_init,X,y)
Run Code Online (Sandbox Code Playgroud)

3 将数据帧分散到所有节点,然后通过期货使用它

def test(X):
    return X
f_X = client.scatter(X, broadcast=True)
f = client.submit(test,f_X)
f.result()
Run Code Online (Sandbox Code Playgroud)

没有一个变体适用于我的情况。变体 1 和 2 的工作方式几乎相同。dask-scheduler 为每个任务增加内存,并且永远不会释放它,直到它耗尽内存并且任务失败。

变体 3 不起作用,因为我没有传递 Pandas 数据帧,而是得到了一些垃圾。

如何将数据帧发送给工作人员并且在调度程序上没有 MemoryError?

变体 3 的完整代码应该是内存高效的,但甚至不传递数据帧:

import pandas as pd
import numpy as np
from distributed import Client
client = Client('localhost:8786')
X = np.random.rand(10000,100)
X=pd.DataFrame(X)
f_X = client.scatter(X, broadcast=True)
def test(X):
    return X
f = client.submit(test,f_X)
f.result()[:10]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)

mdu*_*ant 5

client.scatter检查输入列表,因此当您传入数据框时,您不小心将其解包为系列列表。你应该做 f_X = client.scatter([X], broadcast=True)

现在你在每个工人身上都有一个数据框。这里 f_X 也是一个列表,包含一个未来,所以你会想要f = client.submit(test,f_X[0]).

一般来说,您会更好,因为您可以在工作人员的函数中生成/加载您的数据,而不是从您的客户端传递它们,这显然需要将整个内容放入本地内存,复制该数据,以及一路上的序列化成本。

  • 从`distributed.__version__ == 1.16.3` 开始,scatter 适当地接受单例参数 (2认同)