dask.distributed LocalCluster 与线程与进程之间的区别

jri*_*ker 7 python dask dask-distributed

以下LocalCluster配置有dask.distributed什么区别?

Client(n_workers=4, processes=False, threads_per_worker=1)
Run Code Online (Sandbox Code Playgroud)

相对

Client(n_workers=1, processes=True, threads_per_worker=4)
Run Code Online (Sandbox Code Playgroud)

它们都有四个线程处理任务图,但第一个线程有四个工作线程。那么,与具有多个线程的单个工作人员相比,让多个工作人员充当线程有什么好处?

编辑:只是澄清一下,我知道进程、线程和共享内存之间的区别,所以这个问题更多地针对这两个客户端的配置差异。

mdu*_*ant 6

您在这里混淆了一些不同的事情:

  • 进程和线程数量之间的平衡,不同的混合有利于不同的工作负载。每个工作线程更多的线程意味着更好地共享内存资源并避免序列化;更少的线程和更多的进程意味着更好地避免 GIL

  • 使用 时processes=False,调度程序和工作线程都作为客户端在同一进程中的线程运行。同样,它们将共享内存资源,您甚至不必在客户端和调度程序之间序列化对象。但是,您将拥有许多线程,并且客户端的响应速度可能会降低。这通常用于测试,因为可以直接内省调度程序和工作对象。


Vic*_*aro 5

当您使用processes=False时,您将限制集群仅通过您的机器架构工作。

from dask.distributed import Client

# The address provided by processes=False is a In-process transport address. 
# This is used to perform communication between threads 
# Scheduler and workers are on the same machine.
client = Client(processes=False)
client
<Client: scheduler='inproc://10.0.0.168/31904/1' processes=1 cores=4>

# The address provided on processes=True is tcp protocol. 
# This is a network address. You can start workers from others machines
# just pointing the scheduler address to this tcp address 
# (All machines must be on the same network).
client = Client(processes=True)
client
<Client: scheduler='tcp://127.0.0.1:53592' processes=4 cores=4>
Run Code Online (Sandbox Code Playgroud)


jri*_*ker 5

我受到 Victor 和 Martin 的回答的启发而进行了更深入的挖掘,因此这里对我的理解进行了深入总结。(不能在评论中做到这一点)

首先,请注意此版本的 dask 中的调度程序打印输出不是很直观。processes实际上是worker的数量,cores实际上是所有worker中的线程总数。

其次,维克多关于 TCP 地址和添加/连接更多工人的评论很好地指出。我不确定是否可以将更多工作人员添加到具有 的集群中processes=False,但我认为答案可能是肯定的。

现在,考虑以下脚本:

from dask.distributed import Client

if __name__ == '__main__':
    with Client(processes=False) as client:  # Config 1
        print(client)
    with Client(processes=False, n_workers=4) as client:  # Config 2
        print(client)
    with Client(processes=False, n_workers=3) as client:  # Config 3
        print(client)
    with Client(processes=True) as client:  # Config 4
        print(client)
    with Client(processes=True, n_workers=3) as client:  # Config 5
        print(client)
    with Client(processes=True, n_workers=3,
                threads_per_worker=1) as client:  # Config 6
        print(client)
Run Code Online (Sandbox Code Playgroud)

这会dask为我的笔记本电脑(4 核)在2.3.0 版中产生以下输出:

<Client: scheduler='inproc://90.147.106.86/14980/1' processes=1 cores=4>
<Client: scheduler='inproc://90.147.106.86/14980/9' processes=4 cores=4>
<Client: scheduler='inproc://90.147.106.86/14980/26' processes=3 cores=6>
<Client: scheduler='tcp://127.0.0.1:51744' processes=4 cores=4>
<Client: scheduler='tcp://127.0.0.1:51788' processes=3 cores=6>
<Client: scheduler='tcp://127.0.0.1:51818' processes=3 cores=3>
Run Code Online (Sandbox Code Playgroud)

以下是我对配置之间差异的理解:

  1. 调度程序和所有工作程序都作为客户端进程中的线程运行。(正如 Martin 所说,这对于内省很有用。)因为既没有给出 worker 的数量也没有给出线程/worker 的数量,所以dask调用它的函数nprocesses_nthreads()来设置默认值(有processes=False1 个进程和线程等于可用内核)。
  2. 与 1 相同,但由于n_workers是给定的,因此由 dask 选择线程/工人,使得线程总数等于内核数(即 1)。同样,processes在打印输出中并不完全正确——它实际上是工作人员的数量(在这种情况下实际上是线程)。
  3. 与 2 相同,但由于n_workers不平均划分为内核数,因此 dask 选择 2 个线程/worker 进行过量使用而不是未充分使用。
  4. Client、Scheduler 和所有 worker 都是独立的进程。Dask 选择默认工作线程数(等于核心数,因为它 <= 4)和默认线程数/工作线程数 (1)。
  5. 与 5 相同的进程/线程配置,但由于与 3 相同的原因,总线程数过多。
  6. 这符合预期。