从客户端将本地文件加载到dask分布式集群上

Bot*_*Man 5 python python-3.x dask

有点初学者的问题,但是我没有找到相关的答案。

基本上,我有关(7gb)的数据位于本地计算机上。我已经在本地网络上运行了分布式集群。如何将该文件添加到群集中?

普通的dd.read_csv()或read_parquet()失败,因为工作进程无法在自己的环境中找到文件。

我是否需要手动将文件传输到集群中的每个节点?

注意:由于管理员的限制,我仅限于SFTP ...

MRo*_*lin 5

两种选择

网络文件系统

如注释中所建议,可以使用多种方法使用常规文件系统解决方案使本地文件可供群集中的其他计算机访问。如果您可以访问,这是一个不错的选择。

本地加载和分散

如果那行不通,那么您始终可以在本地加载数据并将其分散到群集的各个工作人员中。如果您的文件大于单台计算机的内存,则可能需要逐个执行此操作。

单程

如果一切都适合内存,那么我将正常加载数据,然后将其分散给工作人员。如果需要,您可以将其拆分出来,然后分发给其他工作人员:

import pandas
import dask.dataframe as dd
from dask.distributed import Client

client = Client('scheduler-address:8786')

df = pd.read_csv('myfile.csv')
future = client.scatter(df)  # send dataframe to one worker
ddf = dd.from_delayed([future], meta=df)  # build dask.dataframe on remote data
ddf = ddf.repartition(npartitions=20).persist()  # split
client.rebalance(ddf)  # spread around all of your workers
Run Code Online (Sandbox Code Playgroud)

多位

如果您有多个小文件,则可以迭代地加载和分散(也许在for循环中),然后从许多期货中创建dask.dataframe

futures = []
for fn in filenames:
    df = pd.read_csv(fn)
    future = client.scatter(df)
    futures.append(future)

ddf = dd.from_delayed(futures, meta=df)
Run Code Online (Sandbox Code Playgroud)

在这种情况下,您可能会跳过重新分区和重新平衡的步骤

如果您只有一个大文件,则可能需要自己进行一些拆分,或者使用 pd.read_csv(..., chunksize=...)