我正在尝试在文件(版本 Dask 0.16.0 )中配置dask 工作空间config.yaml。
我尝试添加
local_dir: /path/to/ssd
Run Code Online (Sandbox Code Playgroud)
或者
local-directory: /path/to/ssd
Run Code Online (Sandbox Code Playgroud)
或者
local_directory: /path/to/ssd
Run Code Online (Sandbox Code Playgroud)
但我仍然dask-worker-space在 py 文件运行的同一目录中找到 。
我是否缺少dask_worker.py中的某些内容?
是否可以dask从 python 脚本运行?
在交互式会话中,我可以只写
from dask.distributed import Client
client = Client()
Run Code Online (Sandbox Code Playgroud)
如所有教程中所述。但是,如果我将这些行写入script.py文件并执行它python script.py,它会立即崩溃。
我找到了另一个选项,就是使用 MPI:
# script.py
from dask_mpi import initialize
initialize()
from dask.distributed import Client
client = Client() # Connect this local process to remote workers
Run Code Online (Sandbox Code Playgroud)
然后使用mpirun -n 4 python script.py. 这不会崩溃,但是如果您打印客户端
print(client)
# <Client: scheduler='tcp://137.250.37.84:35145' processes=0 cores=0>
Run Code Online (Sandbox Code Playgroud)
你看到没有使用内核,因此脚本永远运行而不做任何事情。
如何正确设置我的脚本?
我在一个目录中有 2000 个镶木地板文件。每个镶木地板文件的大小大约为 20MB。使用的压缩是 SNAPPY。每个镶木地板文件都有如下所示的行:
+------------+-----------+-----------------+
| customerId | productId | randomAttribute |
+------------+-----------+-----------------+
| ID1 | PRODUCT1 | ATTRIBUTE1 |
| ID2 | PRODUCT2 | ATTRIBUTE2 |
| ID2 | PRODUCT3 | ATTRIBUTE3 |
+------------+-----------+-----------------+
Run Code Online (Sandbox Code Playgroud)
每个列条目都是一个字符串。我正在使用具有以下配置的 p3.8xlarge EC2 实例:
我正在尝试以下代码:
+------------+-----------+-----------------+
| customerId | productId | randomAttribute |
+------------+-----------+-----------------+
| ID1 | PRODUCT1 | ATTRIBUTE1 |
| ID2 | PRODUCT2 | ATTRIBUTE2 |
| ID2 …Run Code Online (Sandbox Code Playgroud) from dask.distributed import Client
Client()
Client(do_not_spawn_new_if_default_address_in_use=True) # should not spawn a new default cluster
Run Code Online (Sandbox Code Playgroud)
这有可能以某种方式做吗?
我正在尝试执行以下矢量化 if-else 的等效操作,但找不到任何适用于 dask 的内容。(dask.array.where总是返回NotImplemented)
实现这一目标的最佳方法是什么?
np.where(df['columne'] > 0, 0, 1)
我有一个很大的 csv 文件,假设它看起来像这样
ID,PostCode,Value
H1A0A1-00,H1A0A1,0
H1A0A1-01,H1A0A1,0
H1A0A1-02,H1A0A1,0
H1A0A1-03,H1A0A1,0
H1A0A1-04,H1A0A1,1
H1A0A1-05,H1A0A1,0
H1A1G7-0,H1A1G7,0
H1A1G7-1,H1A1G7,0
H1A1G7-2,H1A1G7,0
H1A1N6-00,H1A1N6,0
H1A1N6-01,H1A1N6,0
H1A1N6-02,H1A1N6,0
H1A1N6-03,H1A1N6,0
H1A1N6-04,H1A1N6,0
H1A1N6-05,H1A1N6,0
...
Run Code Online (Sandbox Code Playgroud)
我想按邮政编码值将其拆分,并将具有相同邮政编码的所有行保存为 CSV。我努力了
postals = data['PostCode'].unique()
for p in postals:
df = data[data['PostCode'] == p]
df.to_csv(directory + '/output/demographics/' + p + '.csv', header=False, index=False)
Run Code Online (Sandbox Code Playgroud)
有没有办法使用 Dask 来利用多处理来做到这一点?谢谢
我在 s3 中有每月的 zarr 文件,这些文件具有网格化的温度数据。我想为一个纬度/经度提取多个月的数据并创建该时间序列的数据框。一些伪代码:
datasets=[]
for file in files:
s3 = s3fs.S3FileSystem()
zarr_store = s3fs.S3Map(file, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
datasets.append(ds)
con = xr.concat(datasets, dim='time')
df = con.to_dataframe()
Run Code Online (Sandbox Code Playgroud)
所以这段代码可以工作,但速度非常慢。我希望使用 dask 来加快速度。我的计划是更改一次处理一个文件并返回一个数据帧的方法。然后我会调用 client.map() 并生成所有 dfs,然后在最后将它们连接在一起。所以我结束了类似的事情:
def load(file, lat: float, long: float, start_date, end_date):
s3 = s3fs.S3FileSystem()
s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
zarr_store = s3fs.S3Map(s3_path, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
tmp = x.result().to_array().values
df_time = …Run Code Online (Sandbox Code Playgroud) 我在我的机器上本地使用 Dask 和 Docker Compose (OSX Catalina 10.15.17)。我的 docker-compose 文件与此相同,只是我在笔记本容器中添加了一个卷。
我可以在默认地址正确访问仪表板localhost:8787,但是当我启动一些任务时,即使用 xarray 或类似的东西:
import dask.array as da
x = da.random.random((1000,1000,10), chunks=(1000,1000,5))
y = da.random.random((1000,1000,10), chunks=(1000,1000,5))
z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1,))
z.compute()
Run Code Online (Sandbox Code Playgroud)
仪表板中没有显示任何内容:
我尝试过使用 LocalCluster 启动
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client
Run Code Online (Sandbox Code Playgroud)
然后使用左侧的 Jupyter Lab Dask 扩展从那里启动集群。
你有什么建议吗?我错过了什么吗?
干杯
我希望从 azure blob 中读取大量小文件,这可能是 1k-100k 个文件的顺序,总共有几个 1TB。我必须在 python 中处理这些文件,它本身的处理并不繁重,但是从 blob 中读取文件确实需要时间。另一个限制是,当我处理第一个文件时,新文件已被写入。
我正在寻找执行此操作的选项,是否可以使用 dask 从 blob 并行读取多个文件?或者是否可以在 azure 网络内每小时传输和加载超过 1tb 的数据?
我正在尝试使用它dask来将一个巨大的制表符分隔文件拆分为包含 100,000 个核心的 AWS Batch 阵列上的较小块。
在 AWS Batch 中,每个核心都有一个唯一的环境变量,AWS_BATCH_JOB_ARRAY_INDEX范围从 0 到 99,999(复制到idx下面代码片段中的变量中)。因此,我尝试使用以下代码:
import os
import dask.dataframe as dd
idx = int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"])
df = dd.read_csv(f"s3://main-bucket/workdir/huge_file.tsv", sep='\t')
df = df.repartition(npartitions=100_000)
df = df.partitions[idx]
df = df.persist() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df = df.compute() # this call isn't needed before calling to df.to_csv (see comment by Sultan)
df.to_csv(f"/tmp/split_{idx}.tsv", sep="\t", index=False)
print(idx, df.shape, df.head(5))
Run Code Online (Sandbox Code Playgroud)
我需要先打电话presist和/或compute …