标签: dask

无法配置本地集群 dask 工作目录

我正在尝试在文件(版本 Dask 0.16.0 )中配置dask 工作空间config.yaml
我尝试添加

local_dir: /path/to/ssd
Run Code Online (Sandbox Code Playgroud)

或者

local-directory: /path/to/ssd
Run Code Online (Sandbox Code Playgroud)

或者

local_directory: /path/to/ssd
Run Code Online (Sandbox Code Playgroud)

但我仍然dask-worker-space在 py 文件运行的同一目录中找到 。

我是否缺少dask_worker.py中的某些内容?

dask

1
推荐指数
1
解决办法
1231
查看次数

使用脚本中的 Dask

是否可以dask从 python 脚本运行?

在交互式会话中,我可以只写

from dask.distributed import Client
client = Client()
Run Code Online (Sandbox Code Playgroud)

如所有教程中所述。但是,如果我将这些行写入script.py文件并执行它python script.py,它会立即崩溃。

我找到了另一个选项,就是使用 MPI:

# script.py
from dask_mpi import initialize
initialize()

from dask.distributed import Client
client = Client()  # Connect this local process to remote workers
Run Code Online (Sandbox Code Playgroud)

然后使用mpirun -n 4 python script.py. 这不会崩溃,但是如果您打印客户端

print(client)
# <Client: scheduler='tcp://137.250.37.84:35145' processes=0 cores=0> 
Run Code Online (Sandbox Code Playgroud)

你看到没有使用内核,因此脚本永远运行而不做任何事情。

如何正确设置我的脚本?

python-3.x dask dask-distributed

1
推荐指数
1
解决办法
590
查看次数

CUDF 错误处理大量镶木地板文件

我在一个目录中有 2000 个镶木地板文件。每个镶木地板文件的大小大约为 20MB。使用的压缩是 SNAPPY。每个镶木地板文件都有如下所示的行:

+------------+-----------+-----------------+
| customerId | productId | randomAttribute |
+------------+-----------+-----------------+
| ID1        | PRODUCT1  | ATTRIBUTE1      |
| ID2        | PRODUCT2  | ATTRIBUTE2      |
| ID2        | PRODUCT3  | ATTRIBUTE3      |
+------------+-----------+-----------------+
Run Code Online (Sandbox Code Playgroud)

每个列条目都是一个字符串。我正在使用具有以下配置的 p3.8xlarge EC2 实例:

  • 内存:244GB
  • 虚拟 CPU:32
  • GPU RAM:64GB(每个GPU核心有16GB RAM)
  • GPU : 4 Tesla V100

我正在尝试以下代码:

+------------+-----------+-----------------+
| customerId | productId | randomAttribute |
+------------+-----------+-----------------+
| ID1        | PRODUCT1  | ATTRIBUTE1      |
| ID2        | PRODUCT2  | ATTRIBUTE2      |
| ID2 …
Run Code Online (Sandbox Code Playgroud)

python nvidia parquet dask cudf

1
推荐指数
1
解决办法
1290
查看次数

Dask Client 检测到本地默认集群已经在运行

from dask.distributed import Client

Client()
Client(do_not_spawn_new_if_default_address_in_use=True)  # should not spawn a new default cluster
Run Code Online (Sandbox Code Playgroud)

这有可能以某种方式做吗?

dask dask-distributed

1
推荐指数
1
解决办法
752
查看次数

numpy where 的 dask 等价物是什么?

我正在尝试执行以下矢量化 if-else 的等效操作,但找不到任何适用于 dask 的内容。(dask.array.where总是返回NotImplemented

实现这一目标的最佳方法是什么?

np.where(df['columne'] > 0, 0, 1)

python dask

1
推荐指数
1
解决办法
1308
查看次数

使用 Dask 根据列值保存多个 csv 文件

我有一个很大的 csv 文件,假设它看起来像这样

ID,PostCode,Value
H1A0A1-00,H1A0A1,0
H1A0A1-01,H1A0A1,0
H1A0A1-02,H1A0A1,0
H1A0A1-03,H1A0A1,0
H1A0A1-04,H1A0A1,1
H1A0A1-05,H1A0A1,0
H1A1G7-0,H1A1G7,0
H1A1G7-1,H1A1G7,0
H1A1G7-2,H1A1G7,0
H1A1N6-00,H1A1N6,0
H1A1N6-01,H1A1N6,0
H1A1N6-02,H1A1N6,0
H1A1N6-03,H1A1N6,0
H1A1N6-04,H1A1N6,0
H1A1N6-05,H1A1N6,0
...
Run Code Online (Sandbox Code Playgroud)

我想按邮政编码值将其拆分,并将具有相同邮政编码的所有行保存为 CSV。我努力了

postals = data['PostCode'].unique()
for p in postals:
    df = data[data['PostCode'] == p]
    df.to_csv(directory + '/output/demographics/' + p + '.csv', header=False, index=False)
Run Code Online (Sandbox Code Playgroud)

有没有办法使用 Dask 来利用多处理来做到这一点?谢谢

python csv dask

1
推荐指数
1
解决办法
984
查看次数

如何使用 dask 和 xarray 加载和处理 zarr 文件

我在 s3 中有每月的 zarr 文件,这些文件具有网格化的温度数据。我想为一个纬度/经度提取多个月的数据并创建该时间序列的数据框。一些伪代码:

datasets=[]
for file in files:
    s3 = s3fs.S3FileSystem()        
    zarr_store = s3fs.S3Map(file, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)
    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                       )
    datasets.append(ds)

con = xr.concat(datasets, dim='time')
df = con.to_dataframe()
Run Code Online (Sandbox Code Playgroud)

所以这段代码可以工作,但速度非常慢。我希望使用 dask 来加快速度。我的计划是更改一次处理一个文件并返回一个数据帧的方法。然后我会调用 client.map() 并生成所有 dfs,然后在最后将它们连接在一起。所以我结束了类似的事情:

def load(file, lat: float, long: float, start_date, end_date):

    s3 = s3fs.S3FileSystem()
    s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
    zarr_store = s3fs.S3Map(s3_path, s3=s3)
    zarr = xr.open_zarr(store=zarr_store, consolidated=True)

    ds = zarr.sel(latitude=lat,
                  longitude=long,
                  time=slice(start_date.strftime("%Y-%m-%d"),
                             end_date.strftime("%Y-%m-%d"))
                 )

    tmp = x.result().to_array().values
    df_time = …
Run Code Online (Sandbox Code Playgroud)

python dask python-xarray zarr

1
推荐指数
1
解决办法
777
查看次数

带 docker 的 Dask 在 dask 仪表板中未显示任何内容

我在我的机器上本地使用 Dask 和 Docker Compose (OSX Catalina 10.15.17)。我的 docker-compose 文件与此相同,只是我在笔记本容器中添加了一个卷。
我可以在默认地址正确访问仪表板localhost:8787,但是当我启动一些任务时,即使用 xarray 或类似的东西:

import dask.array as da
x = da.random.random((1000,1000,10), chunks=(1000,1000,5))
y = da.random.random((1000,1000,10), chunks=(1000,1000,5))
z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1,))

z.compute()
Run Code Online (Sandbox Code Playgroud)

仪表板中没有显示任何内容:

空仪表板

我尝试过使用 LocalCluster 启动

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client
Run Code Online (Sandbox Code Playgroud)

然后使用左侧的 Jupyter Lab Dask 扩展从那里启动集群。

你有什么建议吗?我错过了什么吗?

干杯

docker docker-compose dask

1
推荐指数
1
解决办法
1139
查看次数

从 azure blob 并行读取多个文件

我希望从 azure blob 中读取大量小文件,这可能是 1k-100k 个文件的顺序,总共有几个 1TB。我必须在 python 中处理这些文件,它本身的处理并不繁重,但是从 blob 中读取文件确实需要时间。另一个限制是,当我处理第一个文件时,新文件已被写入。

我正在寻找执行此操作的选项,是否可以使用 dask 从 blob 并行读取多个文件?或者是否可以在 azure 网络内每小时传输和加载超过 1tb 的数据?

python azure dask

1
推荐指数
1
解决办法
3003
查看次数

如何使用 dask 分割大型 .csv 文件?

我正在尝试使用它dask来将一个巨大的制表符分隔文件拆分为包含 100,000 个核心的 AWS Batch 阵列上的较小块。

在 AWS Batch 中,每个核心都有一个唯一的环境变量,AWS_BATCH_JOB_ARRAY_INDEX范围从 0 到 99,999(复制到idx下面代码片段中的变量中)。因此,我尝试使用以下代码:

import os
import dask.dataframe as dd

idx = int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"])

df = dd.read_csv(f"s3://main-bucket/workdir/huge_file.tsv", sep='\t')
df = df.repartition(npartitions=100_000)
df = df.partitions[idx]

df = df.persist() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df = df.compute() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df.to_csv(f"/tmp/split_{idx}.tsv", sep="\t", index=False)
print(idx, df.shape, df.head(5))
Run Code Online (Sandbox Code Playgroud)

我需要先打电话presist和/或compute …

python csv dask aws-batch

1
推荐指数
1
解决办法
1707
查看次数