Cod*_*ght 5 dask dask-distributed
我正在尝试将大型Dask数据帧分布在多台计算机上,以便在数据帧上进行(后来的)分布式计算。我为此使用dask-distributed。
我看到的所有分发的示例/文档都是从网络资源(hdfs,s3等)填充初始数据负载,并且似乎没有将DAG优化扩展到负载部分(似乎假设网络负载是在另一个问题的答案上强调了这一点:Dask是否与HDFS通信以优化数据局部性?
但是,我可以看到一些我们想要的情况。例如,如果我们在此数据库的节点上共存有一个分片数据库+ dask worker,我们希望将仅来自本地分片的记录强制填充到本地dask worker中。从文档/示例来看,网络冲突似乎是必须承担的成本。是否可以强制从特定工作人员获取单个数据框的一部分?
我尝试过的替代方法是,尝试强制每个工作程序运行一个函数(迭代地提交给每个工作程序),其中该函数仅加载该计算机/分片本地的数据。这行得通,而且我有一堆具有相同列模式的最佳本地数据框-但是,现在我没有单个数据框,而是n个数据框。是否可以跨多台机器合并/融合数据帧,以便只有一个数据帧引用,但是部分对特定机器具有亲和力(在一定范围内,由任务DAG决定)?
您可以生成 dask“集合”,例如来自 future 和延迟对象的数据帧,它们可以很好地相互操作。
对于每个分区,您知道哪台机器应该加载它,您可以生成一个 future,如下所示:
f = c.submit(make_part_function, args, workers={'my.worker.ip'})
Run Code Online (Sandbox Code Playgroud)
dask 客户端在哪里c,地址是您希望看到它发生的机器。您也可以给出allow_other_workers=True这是一个偏好而不是一个要求。
要从此类期货列表中创建数据框,您可以这样做
df = dd.from_delayed([dask.delayed(f) for f in futures])
Run Code Online (Sandbox Code Playgroud)
并最好提供一个meta=,给出预期数据帧的描述。现在,对给定分区的进一步操作将更愿意安排在已保存数据的同一工作线程上。
| 归档时间: |
|
| 查看次数: |
102 次 |
| 最近记录: |