标签: dask-distributed

覆盖 dask 调度程序以同时在多个工作线程上加载数据

我想在我的分布式集群上运行图形/期货,这些集群都有一个“加载数据”根任务,然后是一堆在该数据上运行的训练任务。简化版本如下所示:

from dask.distributed import Client
client = Client(scheduler_ip)
load_data_future = client.submit(load_data_func, 'path/to/data/')
train_task_futures = [client.submit(train_func, load_data_future, params) 
                      for params in train_param_set]
Run Code Online (Sandbox Code Playgroud)

按照上面的方式运行此计划,调度程序会让一个工作人员读取该文件,然后将该数据溢出到磁盘以与其他工作人员共享。然而,加载数据通常是从一个大的HDF5文件中读取,这可以同时完成,所以我想知道是否有一种方法可以强制所有工作人员同时读取这个文件(他们都计算根任务)而不是让他们等待一名工作人员完成,然后缓慢地从该工作人员传输数据。

我知道有一种client.run()方法可以让所有工作人员同时读取文件,但是如何将读取的数据输入到下游任务中呢?

我无法使用 dask 数据原语同时读取 HDF5 文件,因为我需要多索引和多列分组等功能。

dask dask-distributed

5
推荐指数
1
解决办法
749
查看次数

Dask Distributed: Reading .csv from HDFS

I'm performance testing Dask using "Distributed Pandas on a Cluster with Dask DataFrames" as a guide.

In Matthew's example, he has a 20GB file and 64 workers (8 physical nodes).

In my case, I have a 82GB file and 288 workers (12 physical nodes; there's a HDFS data node on each).

On all 12 nodes, I can access HDFS and execute a simple Python script that displays info on a file:

import pyarrow as pa
fs = pa.hdfs.connect([url], 8022)
print(str(fs.info('/path/to/file.csv'))) …
Run Code Online (Sandbox Code Playgroud)

python hdfs dask dask-distributed

5
推荐指数
1
解决办法
2509
查看次数

如何在 Dask 中正确使用 client.scatter

当执行“大量”任务时,我收到此错误:

考虑使用 client.scatter 提前分散大型对象,以减轻调度程序负担并保留工作人员的数据

我还收到了一堆这样的消息:

tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f20d25e10b8>>
Traceback (most recent call last):
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/ioloop.py", line 907, in _run
    return self.callback()
  File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/tornado.py", line 542, in _keep_alive
    c.send_ping()
  File "/home/muammar/.local/lib/python3.7/site-packages/bokeh/server/connection.py", line 80, in send_ping
    self._socket.ping(codecs.encode(str(self._ping_count), "utf-8"))
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/websocket.py", line 447, in ping
    raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError
tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x7f20d25e10b8>>
Traceback (most recent call last):
  File "/home/muammar/.local/lib/python3.7/site-packages/tornado/ioloop.py", line …
Run Code Online (Sandbox Code Playgroud)

parallel-processing python-3.x dask dask-distributed

5
推荐指数
1
解决办法
6548
查看次数

Dask 工作线程的内存清理

我正在多节点分布式 Dask 集群上运行多个并行任务。然而,一旦任务完成,worker 仍然持有大量内存,集群很快就会被填满。

client.restart()我在每个任务之后都尝试过client.cancel(df),第一个任务会杀死工作人员并发送到CancelledError其他正在运行的任务,这很麻烦,第二个任务没有多大帮助,因为我们在 Dask 的函数中使用了很多自定义对象和函数map。添加del已知变量gc.collect()也没有多大帮助。

我确信大部分内存占用是由于自定义 python 函数和使用client.map(..).

我的问题是:

  1. 有没有一种从命令行或其他方式类似的方法trigger worker restart if no tasks are running right now
  2. 如果不是,这个问题有哪些可能的解决方案?我不可能避免 Dask 任务中的自定义对象和纯 python 函数。

python dask dask-distributed

5
推荐指数
1
解决办法
3097
查看次数

Streamz/Dask:收集不等待缓冲区的所有结果

进口:

from dask.distributed import Client
import streamz
import time
Run Code Online (Sandbox Code Playgroud)

模拟工作负载:

def increment(x):
    time.sleep(0.5)
    return x + 1
Run Code Online (Sandbox Code Playgroud)

假设我想在本地 Dask 客户端上处理一些工作负载:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).gather().sink(print)

        for i in range(10):
            ps.emit(i)
Run Code Online (Sandbox Code Playgroud)

这按预期工作,但sink(print)当然会强制等待每个结果,因此流不会并行执行。

但是,如果我使用buffer()允许缓存结果,则gather()似乎不再正确收集所有结果,并且解释器在获取结果之前退出。这种方法:

if __name__ == "__main__":
    with Client() as dask_client:
        ps = streamz.Stream()
        ps.scatter().map(increment).buffer(10).gather().sink(print)
                                     # ^
        for i in range(10):          # - allow parallel execution 
            ps.emit(i)               # - before gather()
Run Code Online (Sandbox Code Playgroud)

...为我打印任何结果。Python 解释器在启动脚本后不久、发出结果之前 …

python stream dask dask-distributed streamz

5
推荐指数
1
解决办法
443
查看次数

使用 docker-compose 的本地 dask 集群

我想创建一个包含我们公司分析工具链的 docker-compose.yml。为此,我添加了 dask。docker-compoe.yml 如下所示:

docker-compose.yml

version: '3'
services:
  jupyter:
    build: docker/jupyter/.
    ports:
      - "8899:8899"
    depends_on: 
      - dask-scheduler
      - dask-worker
    volumes:
      - ./notebooks:/notebooks

  dask-scheduler:
    build:
      docker/dask/.
    hostname: dask-scheduler
    ports:
      - "8786:8786"
      - "8787:8787"
    volumes:
      - ./notebooks:/notebooks
    command: ["dask-scheduler"]

  dask-worker:
    build:
      docker/dask/.
    depends_on:
      - dask-scheduler
    volumes:
      - ./notebooks:/notebooks
    command: ["dask-worker", "tcp://dask-scheduler:8786"]
Run Code Online (Sandbox Code Playgroud)

为了构建两个 dask 容器,我使用这个 Dockerfile:

docker/dask/Dockerfile

FROM python:3.7
RUN apt-get update -y && apt-get install -y python3-pip libsnappy-dev
RUN pip install numpy
RUN pip install dask
RUN pip install distributed
RUN pip …
Run Code Online (Sandbox Code Playgroud)

python docker docker-compose dask dask-distributed

5
推荐指数
1
解决办法
2547
查看次数

Dask 分布式 KeyError

我正在尝试使用一个小例子来学习 Dask。基本上我读入一个文件并计算行平均值。

from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=4, memory='24 GB')

cluster.scale(4)

from dask.distributed import Client
client = Client(cluster)

import dask
import numpy as np
import dask.dataframe as dd

mytbl = dd.read_csv('test.txt', sep=' ')
row_mean = mytbl.loc[:, mytbl.columns != 'chrom'].apply(np.mean, axis=1, meta=(None, 'float64'))
row_mean = row_mean.compute()
Run Code Online (Sandbox Code Playgroud)

当我运行时compute,我可以在Dask仪表板中看到内存使用量增加非常快,并且所有CPU也都被使用。但随后内存增加停止,我看到这个错误:

distributed.utils - ERROR - "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
Traceback (most recent call last):
  File "~/miniconda3/lib/python3.8/site-packages/distributed/utils.py", line 668, in log_errors
    yield
  File "~/miniconda3/lib/python3.8/site-packages/distributed/scheduler.py", line 3785, in add_worker
    typename=types[key],
KeyError: "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
distributed.core - ERROR …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed dask-dataframe dask-jobqueue

5
推荐指数
0
解决办法
1175
查看次数

Dask 计算随着时间的推移而减慢

我在使用 Dask 时遇到以下问题。我注意到随着时间的推移,相同的计算花费的时间越来越长。重新启动调度程序后,计算速度再次加快,但速度继续减慢。下图显示了我的计算所消耗的时间(我在循环中运行它们。更宽的空白来自于我运行循环几次并在它们之间有中断)。我还注意到每个工作人员的 CPU 使用率随着时间的推移而下降。

每次迭代消耗的时间不断增长

当我查看 Dask 仪表板时,我发现工作人员的 CPU 消耗随着时间的推移而下降 - 在第一次迭代中通常为 90-100%,在最后一次迭代中以 0-20% 结束。此外,在后期迭代中任务流似乎更加稀疏: 早期迭代与后期迭代中的任务流

可重现的例子

我使用dask容器daskdev/dask-notebook(Python 3.8、dask 2021.4.0、dask-core 2021.4.0、分布式2021.4.0、numpy 1.18.1、h5py 3.2.1)。在该容器中,我运行 2 个 jupyter 笔记本 - 一个用于运行调度程序,另一个用于运行计算。

准备数据

在其中一台笔记本中运行以下命令来创建数据。

import h5py
import numpy as np
path = "/home/jovyan/data.h5" 
x = np.random.randint(0, 2**15, size=(4096, 150000), dtype=np.int16)
with h5py.File(path, "w") as f:
    f.create_dataset("/x", data=x, chunks=(32, 150000))
Run Code Online (Sandbox Code Playgroud)

运行计算

在笔记本 1 中启动调度程序:

from dask.distributed import Client
c = Client()
c
Run Code Online (Sandbox Code Playgroud)

我的机器有 4 个核心(8 个线程)和 16GB 内存,因此这会创建 4 个工作线程,每个线程有 2 个线程和 3.8GB …

python docker dask dask-distributed

5
推荐指数
1
解决办法
1135
查看次数

在 Dask 中实施等宽间隔特征工程

在等宽离散化中,变量值被分配到相同宽度的区间。间隔的数量是用户定义的,宽度由最小/最大值和间隔的数量确定。

例如,给定值 10、20、100、130,最小值为 10,最大值为 130。如果用户将间隔数定义为 6,则给出以下公式:

区间宽度 = (Max(x) - Min(x)) / N

宽度为 (130 - 10) / 6 = 20

六个从零开始的区间是:[ 10, 30, 50, 70, 90, 110, 130]

最后,为数据集中的每个元素定义区间分配:

Value in the dataset    New feature engineered value
          10                      0
          20                      0
          57                      2 
         101                      4
         130                      5
Run Code Online (Sandbox Code Playgroud)

我有以下代码,它使用pandas数据框和sklean函数将数据框以等宽间隔划分:

from sklearn.preprocessing import KBinsDiscretizer
discretizer = KBinsDiscretizer(n_bins=10, encode='ordinal', strategy='uniform')
df['output_col'] = discretizer.fit_transform(df[['input_col']])
Run Code Online (Sandbox Code Playgroud)

这工作正常,但我需要实现一个等效的daskKBinsDiscretizer函数,该函数将在多个分区中并行触发该进程,并且我在dask_ml.preprocessing任何建议中找不到?我无法使用,map_partitions因为它将将该函数独立地应用于每个分区,并且我需要将间隔应用于整个数据帧。

scikit-learn dask dask-distributed dask-dataframe

5
推荐指数
1
解决办法
453
查看次数

对 Dask 数组的列应用函数

将函数应用于 Dask 数组的每一列的最有效方法是什么?如下所述,我已经尝试了很多方法,但我仍然怀疑我对 Dask 的使用相当业余。

\n

我有一个相当宽且相当长的数组,大小约为 3,000,000 x 10,000。我想将 ecdf 函数应用于该数组的每一列。堆叠在一起的各个列结果应生成与输入数组具有相同维度的数组。

\n

考虑以下测试,让我知道哪种方法是理想的方法或者我可以如何改进。我知道,我可以只使用最快的,但我真的想最大限度地利用 Dask 的可能性。阵列也可以大数倍。与此同时,我的基准测试结果令我感到惊讶。也许我没有正确理解 Dask 背后的逻辑。

\n
import numpy as np\nimport dask\nimport dask.array as da\nfrom dask.distributed import Client, LocalCluster\nfrom statsmodels.distributions.empirical_distribution import ECDF\n\n### functions\ndef ecdf(x):\n    fn = ECDF(x)\n    return fn(x)\n\ndef ecdf_array(X):\n\n    res = np.zeros_like(X)\n    for i in range(X.shape[1]):\n        res[:,i] = ecdf(X[:,i])\n        \n    return res\n\n### set up scheduler / workers\nn_workers = 10\ncluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)\nclient = Client(cluster)\n\n### create data set \nX = da.random.random((100000,100)) #dask\nXarr = X.compute() #numpy\n\n### traditional for loop\n%timeit …
Run Code Online (Sandbox Code Playgroud)

python dask dask-delayed dask-distributed dask-dataframe

5
推荐指数
1
解决办法
682
查看次数