标签: dask-distributed

达斯:数据框组上的唯一方法

我想知道在与Dask进行groupBy聚合后,是否有可能从给定的列中获得多个唯一项。我在文档中看不到任何类似信息。它在pandas数据框上可用,并且非常有用。我已经看到一些与此相关的问题,但是我不确定它是否已实现。

有人可以给我一些提示吗?

python dask dask-distributed

5
推荐指数
1
解决办法
1254
查看次数

dask.distributed中的信号量?

我有一个带有n个worker的dask集群,并希望worker对数据库进行查询.但是数据库只能并行处理m个查询,其中m <n.我如何在dask.distributed中对其进行建模?只有m个工人应该并行完成这项任务.

我已经看到分布式支持锁(http://distributed.readthedocs.io/en/latest/api.html#distributed.Lock).但有了这个,我只能并行执行一个查询,而不是m.

我也看到我可以为每个工人定义资源(https://distributed.readthedocs.io/en/latest/resources.html).但这也不合适,因为数据库独立于工人.我要么必须为每个worker定义1个数据库资源(这会导致太多的并行查询).或者我必须将m个数据库资源分配给n个工作者,这在设置集群和执行中次优时很困难.

是否可以在dask中定义类似信号量的东西来解决这个问题?

dask dask-distributed

5
推荐指数
1
解决办法
138
查看次数

基于列或函数的Dask数据框拆分分区

我最近开始在Dask寻找大数据。我对有效并行应用操作有疑问。

说我有一些这样的销售数据:

customerKey产品Key交易Key总销售净销售额单位销售量交易日期
----------- -------------- ---------------- --------- --------- ---------- ------ --------------------
    20353 189 219548 0.921058 0.921058 1 1 2017-02-01 00:00:00
  2596618 189 215015 0.709997 0.709997 1 1 2017-02-01 00:00:00
 30339435 189 215184 0.918068 0.918068 1 1 2017-02-01 00:00:00
 32714675 189 216656 0.751007 0.751007 1 1 2017-02-01 00:00:00
 39232537 189 218180 0.752392 0.752392 1 1 2017-02-01 00:00:00
 41722826 189 216806 0.0160143 0.0160143 1 1 2017-02-01 00:00:00
 46525123 189 219875 0.469437 0.469437 1 1 2017-02-01 00:00:00
 51024667 189 215457 0.244886 0.244886 1 …

python dataframe pandas dask dask-distributed

5
推荐指数
1
解决办法
2905
查看次数

在Dask数据框子集上强制局部性

我正在尝试将大型Dask数据帧分布在多台计算机上,以便在数据帧上进行(后来的)分布式计算。我为此使用dask-distributed。

我看到的所有分发的示例/文档都是从网络资源(hdfs,s3等)填充初始数据负载,并且似乎没有将DAG优化扩展到负载部分(似乎假设网络负载是在另一个问题的答案上强调了这一点:Dask是否与HDFS通信以优化数据局部性?

但是,我可以看到一些我们想要的情况。例如,如果我们在此数据库的节点上共存有一个分片数据库+ dask worker,我们希望将仅来自本地分片的记录强制填充到本地dask worker中。从文档/示例来看,网络冲突似乎是必须承担的成本。是否可以强制从特定工作人员获取单个数据框的一部分?

我尝试过的替代方法是,尝试强制每个工作程序运行一个函数(迭代地提交给每个工作程序),其中该函数仅加载该计算机/分片本地的数据。这行得通,而且我有一堆具有相同列模式的最佳本地数据框-但是,现在我没有单个数据框,而是n个数据框。是否可以跨多台机器合并/融合数据帧,以便只有一个数据帧引用,但是部分对特定机器具有亲和力(在一定范围内,由任务DAG决定)?

dask dask-distributed

5
推荐指数
1
解决办法
102
查看次数

Dad等效于pd.to_numeric

我正在尝试使用dask read_csv读取多个CSV文件,每个文件约15 GB。在执行此任务时,dask会将特定的列解释为float,但是它具有一些字符串类型的值,后来当我尝试执行某些操作以表明它无法将字符串转换为float时,它将失败。因此,我使用dtype = str参数将所有列读取为字符串。现在,我想将特定的列转换为带有errors ='coerce'的数值,以便将那些包含字符串的记录转换为NaN值,其余的转换为正确的浮点数。您能告诉我如何使用dask来实现吗?

已经尝试过:astype转换

import dask.dataframe as dd
df = dd.read_csv("./*.csv", encoding='utf8', 
                 assume_missing = True, 
                 usecols =col_names.values.tolist(),
                    dtype=str)
df["mycol"] = df["mycol"].astype(float)
search_df = df.query('mycol >0').compute()
Run Code Online (Sandbox Code Playgroud)
ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+-----------------------------------+--------+----------+
| Column                            | Found  | Expected |
+-----------------------------------+--------+----------+
| mycol                             | object | float64  |
+-----------------------------------+--------+----------+

The following columns also raised exceptions on conversion:

- mycol
  ValueError("could not convert string to float: 'cliqz.com/tracking'")
Run Code Online (Sandbox Code Playgroud)
#Reproducible example
import dask.dataframe as dd
df = dd.read_csv("mydata.csv", encoding='utf8', 
                 assume_missing = …
Run Code Online (Sandbox Code Playgroud)

dask dask-distributed

5
推荐指数
1
解决办法
308
查看次数

不断收到“distributed.utils_perf - 警告 - 完整的垃圾收集花费了 19% 的 CPU 时间......”

在我完成 DASK 代码后,我不断收到“distributed.utils_perf - 警告 - 完整的垃圾收集最近占用了 19% CPU 时间”警告消息。我正在使用 DASK 进行大型地震数据计算。计算完成后,我将计算出的数据写入磁盘。写入磁盘部分比计算花费的时间要长得多。在我将数据写入磁盘之前,我调用了 client.close(),我假设我已经完成了 DASK。但是“distributed.utils_perf - 警告 - 完整的垃圾收集最近占用了 19% 的 CPU 时间”不断出现。当我进行计算时,我收到了 3-4 次此警告消息。但是当我将数据写入磁盘时,我每 1 秒收到一次警告。我怎样才能摆脱这个烦人的警告?谢谢。

dask-distributed

5
推荐指数
1
解决办法
1876
查看次数

Dask Distributed.nanny - 警告 - 重新启动工作线程问题

我正在使用 Dask,但有点困惑。

我运行下面的命令并得到这个,直到进程崩溃。

当出现故障时,它使用了所有 4 个 CPU 核心的 100%;

有人能给我建议吗?

distributed.nanny - WARNING - Restarting worker
Run Code Online (Sandbox Code Playgroud)

这是代码

import pandas as pd
import dask.dataframe as dd
import numpy as np
import time
from dask.distributed import Client
client = Client()
%time dahsn = dd.read_csv("US_Accidents_Dec19.csv")
dahsn.groupby('City').count().compute()
Run Code Online (Sandbox Code Playgroud)

python-3.x dask-distributed

5
推荐指数
0
解决办法
2158
查看次数

如何检查是否有一个已经在运行的 dask 调度程序?

我想从具有特定数量的工作人员的 python 启动本地集群,然后将客户端连接到它。

cluster = LocalCluster(n_workers=8, ip='127.0.0.1')
client = Client(cluster)
Run Code Online (Sandbox Code Playgroud)

但在此之前,我想检查是否存在现有的本地集群,例如由 dask-scheduler 命令启动。有没有办法做到这一点 ?

dask dask-distributed

4
推荐指数
1
解决办法
2186
查看次数

Dask 计算速度很慢

我有一个包含 500 万条记录的数据框。我试图通过利用 python 中的 dask 数据帧使用下面的代码来处理它

 import dask.dataframe as dd                                          
 dask_df = dd.read_csv(fullPath)
 ............
 for index , row in uniqueURLs.iterrows():
   print(index);
   results = dask_df[dask_df['URL'] == row['URL']]
   count = results.size.compute();
Run Code Online (Sandbox Code Playgroud)

但我注意到 dask 在过滤数据帧方面非常有效,但不是在 .compute() 中。因此,如果我删除了计算结果大小的行,我的程序就会变得非常快。有人可以解释一下吗?我怎样才能让它更快?

python performance python-3.x dask dask-distributed

4
推荐指数
1
解决办法
3205
查看次数

Dask多阶段资源设置导致Failed to Serialize错误

使用 Dask 文档中的确切代码: https://jobqueue.dask.org/en/latest/examples.html

如果页面发生变化,代码如下:

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(memory='8g',
                       processes=1,
                       cores=2,
                       extra=['--resources ssdGB=200,GPU=2'])

cluster.scale(2)
client = Client(cluster)

def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,
                                resources={tuple(stage_1): {'GPU': 1},
                                           tuple(stage_2): {'ssdGB': 100}})
Run Code Online (Sandbox Code Playgroud)

这会导致这样的错误:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback …
Run Code Online (Sandbox Code Playgroud)

python python-3.x dask dask-delayed dask-distributed

4
推荐指数
1
解决办法
244
查看次数