标签: dask-distributed

修改 dask 数据框的安全且高效的方法

作为数据工作流的一部分,我需要修改 dask 数据框列子集中的值并传递结果以进行进一步计算。特别是,我对两种情况感兴趣:映射列和映射分区。对数据采取行动的推荐安全且高效的方式是什么?我在每个主机上有多个工作进程的集群上运行分布式设置

情况1。

我想跑:

res = dataframe.column.map(func, ...)
Run Code Online (Sandbox Code Playgroud)

这将返回一个数据系列,所以我假设原始数据帧没有被修改。将一列分配回数据框是否安全,例如dataframe['column']=res?可能不是。我应该使用.copy()制作一个副本,然后将结果分配给它,例如:

dataframe2 = dataframe.copy()
dataframe2['column'] = dataframe.column.map(func, ...)
Run Code Online (Sandbox Code Playgroud)

还有其他推荐的方法吗?

案例2

我需要映射数据帧的分区:

df.map_partitions(mapping_func, meta=df)
Run Code Online (Sandbox Code Playgroud)

mapping_func() 中,我想通过使用partition[column].map或简单地创建列表理解来修改所选列中的值。同样,如何安全地修改分区并从映射函数中返回它?

映射函数收到的分区是 Pandas 数据帧(原始数据的副本?),但是在就地修改数据时,我看到了一些崩溃(尽管没有异常/错误消息)。调用也是一样partition.copy(deep=False),它不起作用。分区应该被深度复制然后就地修改吗?或者我应该总是从新的/映射的列数据和原始/未修改的系列/列中构建一个新的数据框?

dask dask-distributed

5
推荐指数
1
解决办法
903
查看次数

dask 工作人员存储结果或文件的默认目录是什么?

[mapr@impetus-i0057 latest_code_deepak]$ dask-worker 172.26.32.37:8786
distributed.nanny - INFO -         Start Nanny at: 'tcp://172.26.32.36:50930'
distributed.diskutils - WARNING - Found stale lock file and directory '/home/mapr/latest_code_deepak/dask-worker-space/worker-PwEseH', purging
distributed.worker - INFO -       Start worker at:   tcp://172.26.32.36:41694
distributed.worker - INFO -          Listening to:   tcp://172.26.32.36:41694
distributed.worker - INFO -              bokeh at:          172.26.32.36:8789
distributed.worker - INFO -              nanny at:         172.26.32.36:50930
distributed.worker - INFO - Waiting to connect to:    tcp://172.26.32.37:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          8
distributed.worker - INFO -                Memory:                   33.52 GB …
Run Code Online (Sandbox Code Playgroud)

dask dask-delayed dask-distributed

5
推荐指数
1
解决办法
3430
查看次数

当计算速度比分布式 dask 中的传输速度快时,如何复制数据?

我有一个较大的对象 (150 MB),我需要向所有 dask 分布式工作人员广播它,以便它可以在未来的任务中使用。我尝试了几种方法:

  • Client.scatter(broadcast=True):这需要从一台机器(我在其中运行客户端和调度程序)发送所有数据,这会造成带宽瓶颈。
  • Client.submit其次是Client.replicate:这些工作人员共享一个文件系统,因此我可以安排加载数据的任务,然后将数据复制到所有工作人员,而不是发送数据。这似乎使用树策略来分发数据,这比之前的选项更快。

但是,强制每个工作人员在本地运行加载数据功能可能会更快,而不是在一个工作人员上加载数据并将其序列化到工作人员之间。有没有办法做到这一点? Client.run似乎是我想要的一部分,但我需要为加载的数据取回一个未来,我可以稍后将其传递给其他任务。

dask dask-distributed

5
推荐指数
1
解决办法
244
查看次数

在单个多核机器上索引大型 dask 数据帧时的内存使用情况

我正在尝试将Wikipedia CirrusSearch 转储转换为由 450G 16 核 GCP 实例上的标题索引的 Parquet 支持的 dask 数据帧。CirrusSearch 转储以单个 json 行格式文件的形式出现。英文 Wipedia 转储包含 500 万个 recard,压缩为 12G,扩展为 90+G。一个重要的细节是记录并不完全平坦。

最简单的方法是

import json
import dask
from  dask import bag as db, dataframe as ddf
from  toolz import curried as tz
from toolz.curried import operator as op

blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')

lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'

source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'

(
 db
 .read_text(source, blocksize=blocksize)
 .map(json.loads)
 .filter(tz.flip(op.contains, 'title'))
 .to_dataframe()
 .set_index('title', npartitions=npartitions)
 .to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
) …
Run Code Online (Sandbox Code Playgroud)

parquet dask fastparquet dask-distributed dask.distributed

5
推荐指数
1
解决办法
759
查看次数

使用 Dask 或 Joblib 并行 Sklearn 模型构建

我有一大组 sklearn 管道,我想与 Dask 并行构建。这是一个简单但幼稚的顺序方法:

from sklearn.naive_bayes import MultinomialNB 
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

iris = load_iris()
X_train, X_test, Y_train, Y_test = train_test_split(iris.data, iris.target, test_size=0.2)

pipe_nb = Pipeline([('clf', MultinomialNB())])
pipe_lr = Pipeline([('clf', LogisticRegression())])
pipe_rf = Pipeline([('clf', RandomForestClassifier())])

pipelines = [pipe_nb, pipe_lr, pipe_rf]  # In reality, this would include many more different types of models with varying but specific parameters

for pl in pipelines:
    pl.fit(X_train, Y_train) …
Run Code Online (Sandbox Code Playgroud)

python scikit-learn dask dask-distributed

5
推荐指数
1
解决办法
1399
查看次数

如何将多个参数传递给 dask.distributed.Client().map?

import dask.distributed
def f(x, y):
    return x, y
client = dask.distributed.Client()
client.map(f, [(1, 2), (2, 3)])
Run Code Online (Sandbox Code Playgroud)

不起作用。

[<Future: status: pending, key: f-137239e2f6eafbe900c0087f550bc0ca>,
 <Future: status: pending, key: f-64f918a0c730c63955da91694fcf7acc>]

distributed.worker - WARNING -  Compute Failed
Function:  f
args:      ((1, 2))
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)

distributed.worker - WARNING -  Compute Failed
Function:  f
args:      ((2, 3))
kwargs:    {}
Exception: TypeError("f() missing 1 required positional argument: 'y'",)
Run Code Online (Sandbox Code Playgroud)

dask dask-distributed

5
推荐指数
2
解决办法
2387
查看次数

使用 dask 分布式时,Pycharm 调试器会抛出 Bad 文件描述符错误

我正在使用最轻量/简单的dask多处理,即非集群本地Client

from distributed import Client
client = Client()
Run Code Online (Sandbox Code Playgroud)

即便如此:第一个调用实例会产生dask.bag.compute()以下结果:

Connected to pydev debugger (build 191.7141.48)
Traceback (most recent call last):
  File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 383, in _on_run
    r = self.sock.recv(1024)
OSError: [Errno 9] Bad file descriptor
Traceback (most recent call last):
  File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 383, in _on_run
    r = self.sock.recv(1024)
OSError: [Errno 9] Bad file descriptor
Traceback (most recent call last):
  File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 383, in _on_run
    r = self.sock.recv(1024)
OSError: [Errno 9] Bad file …
Run Code Online (Sandbox Code Playgroud)

python pycharm dask dask-distributed

5
推荐指数
0
解决办法
1350
查看次数

如何正确使用dask的upload_file()将本地代码传递给worker

我有一个local_code.py文件中的函数,我想通过 dask 传递给工作人员。我在这里看到问题的答案说这可以使用该upload_file()函数来完成,但我似乎无法让它工作,因为我仍然得到一个ModuleNotFoundError.

代码的相关部分如下。

from dask.distributed import Client
from dask_jobqueue import SLURMCluster

from local_code import *
helper_file = '/absolute/path/to/local_code.py'

def main():
    with SLURMCluster(**slurm_params) as cluster:

        cluster.scale(n_workers)

        with Client(cluster) as client:
            client.upload_file(helper_file)
            mapping = client.map(myfunc, data)
            client.gather(mapping)

if __name__ == '__main__':
    main()

Run Code Online (Sandbox Code Playgroud)

请注意,myfunc是从 导入的local_code,并且将其导入到地图没有错误。该函数myfunc还依赖于 中定义的其他函数local_code

使用此代码,我仍然收到此错误

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04\x95+\x00\x00\x00\x00\x00\x00\x00\x8c\x11local_code\x94\x8c\x$
Traceback (most recent call last):
  File "/home/gallagher.r/.local/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads
    return pickle.loads(x) …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed

5
推荐指数
1
解决办法
1285
查看次数

Dask:持续提交,处理所有提交的数据

有 500 个,不断增长DataFrames,我想将(对于每个 DataFrame 独立的)数据的操作提交到dask. 我的主要问题是:可以dask保存不断提交的数据,所以我可以submit对所有提交的数据进行函数处理——而不仅仅是新提交的数据?

但是让我们用一个例子来解释它:

创建一个dask_server.py

from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'

def run_cluster():
    cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
    print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
    client = Client(cluster)
    print(client)
    print("Press Enter to quit ...")
    input()

if __name__ == '__main__':
    run_cluster()
Run Code Online (Sandbox Code Playgroud)

现在,我可以从我的连接my_stream.py,并开始submitgather数据:

DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client …
Run Code Online (Sandbox Code Playgroud)

python-3.x dask dask-distributed streamz

5
推荐指数
1
解决办法
216
查看次数

在 Azure 中部署容器集群

我有一个 Docker 应用程序,它在 Windows 上的笔记本电脑中运行良好,使用组合和启动容器的多个实例作为 Dask 集群。

服务的名称是“worker”,我启动了两个容器实例,如下所示:

docker compose up --scale worker=2
Run Code Online (Sandbox Code Playgroud)

我在 Azure 上部署了映像,当我运行 docker compose(使用我在 Windows 中使用的相同命令)时,只启动了一个容器。

如何在 Azure 中部署容器集群?我可以使用 docker compose 还是我需要采用不同的方法,例如使用模板或 Kubernetes 进行部署?

这是docker-compose.yml文件:

version: "3.0"
services:

  web:
    image: sofacr.azurecr.io/pablo:job2_v1
    volumes:
      - daskvol:/code/defaults_prediction      
    ports:
      - "5000:5000"
    environment:
      - SCHEDULER_ADDRESS=scheduler
      - SCHEDULER_PORT=8786
    working_dir: /code
    entrypoint:
      - /opt/conda/bin/waitress-serve
    command:
      - --port=5000
      - defaults_prediction:app

  scheduler:
    image: sofacr.azurecr.io/pablo:job2_v1
    ports:
      - "8787:8787"
    entrypoint:
      - /opt/conda/bin/dask-scheduler

  worker:
    image: sofacr.azurecr.io/pablo:job2_v1
    depends_on:
      - scheduler
    environment:
      - PYTHONPATH=/code
      - SCHEDULER_ADDRESS=scheduler …
Run Code Online (Sandbox Code Playgroud)

azure docker docker-compose dask dask-distributed

5
推荐指数
1
解决办法
208
查看次数