如何用dask执行多线程`merge()`?如何通过q​​sub使用多核?

Sha*_*ang 6 python multithreading cluster-computing pandas dask

我刚刚开始使用dask,我仍然在根本上混淆了如何使用多个线程或使用集群执行简单的pandas任务.

让我们pandas.merge()daskdataframes.

import dask.dataframe as dd

df1 = dd.read_csv("file1.csv")
df2 = dd.read_csv("file2.csv")

df3 = dd.merge(df1, df2)
Run Code Online (Sandbox Code Playgroud)

现在,假设我要在我的笔记本电脑上运行这个,有4个内核.如何为此任务分配4个线程?

看来正确的方法是:

dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute()
Run Code Online (Sandbox Code Playgroud)

这将使用尽可能多的线程(即,存在多个具有笔记本电脑共享内存的内核,4)?如何设置线程数?

假设我在一个拥有100个核心的工厂.如何以与将作业提交到群集相同的方式提交此内容qsub?(类似于通过MPI在集群上运行任务?)

dask.set_options(get=dask.threaded.get)
df3 = dd.merge(df1, df2).compute
Run Code Online (Sandbox Code Playgroud)

MRo*_*lin 4

单机调度

默认情况下,Dask.dataframe 将使用线程调度程序,其线程数与计算机中的逻辑核心数相同。

正如注释中所指出的,您可以使用方法的关键字参数来控制线程数或 Pool 实现.compute()

分布式机器调度

您可以使用dask.distributed在集群中的多个节点上部署 dask 工作线程。一种方法qsub是在dask-scheduler本地启动:

$ dask-scheduler
Scheduler started at 192.168.1.100:8786
Run Code Online (Sandbox Code Playgroud)

然后使用qsub启动许多dask-worker进程,指向报告的地址:

$ qsub dask-worker 192.168.1.100:8786 ... <various options>
Run Code Online (Sandbox Code Playgroud)

截至昨天,有一个实验包可以在任何启用 DRMAA 的系统(包括 SGE/qsub 类系统)上执行此操作: https: //github.com/dask/dask-drmaa

完成此操作后,您可以创建一个dask.distributed.Client对象,该对象将作为默认调度程序

from dask.distributed import Client
c = Client('192.168.1.100:8786')  # now computations run by default on the cluster
Run Code Online (Sandbox Code Playgroud)

多线程性能

请注意,截至 Pandas 0.19 版本,GIL 仍未发布pd.merge,因此我不期望使用多线程来大幅提升速度。如果这对您很重要,那么我建议在这里发表评论: https: //github.com/pandas-dev/pandas/issues/13745