我想知道在与Dask进行groupBy聚合后,是否有可能从给定的列中获得多个唯一项。我在文档中看不到任何类似信息。它在pandas数据框上可用,并且非常有用。我已经看到一些与此相关的问题,但是我不确定它是否已实现。
有人可以给我一些提示吗?
我有一个带有n个worker的dask集群,并希望worker对数据库进行查询.但是数据库只能并行处理m个查询,其中m <n.我如何在dask.distributed中对其进行建模?只有m个工人应该并行完成这项任务.
我已经看到分布式支持锁(http://distributed.readthedocs.io/en/latest/api.html#distributed.Lock).但有了这个,我只能并行执行一个查询,而不是m.
我也看到我可以为每个工人定义资源(https://distributed.readthedocs.io/en/latest/resources.html).但这也不合适,因为数据库独立于工人.我要么必须为每个worker定义1个数据库资源(这会导致太多的并行查询).或者我必须将m个数据库资源分配给n个工作者,这在设置集群和执行中次优时很困难.
是否可以在dask中定义类似信号量的东西来解决这个问题?
我最近开始在Dask寻找大数据。我对有效并行应用操作有疑问。
说我有一些这样的销售数据:
customerKey产品Key交易Key总销售净销售额单位销售量交易日期
----------- -------------- ---------------- --------- --------- ---------- ------ --------------------
20353 189 219548 0.921058 0.921058 1 1 2017-02-01 00:00:00
2596618 189 215015 0.709997 0.709997 1 1 2017-02-01 00:00:00
30339435 189 215184 0.918068 0.918068 1 1 2017-02-01 00:00:00
32714675 189 216656 0.751007 0.751007 1 1 2017-02-01 00:00:00
39232537 189 218180 0.752392 0.752392 1 1 2017-02-01 00:00:00
41722826 189 216806 0.0160143 0.0160143 1 1 2017-02-01 00:00:00
46525123 189 219875 0.469437 0.469437 1 1 2017-02-01 00:00:00
51024667 189 215457 0.244886 0.244886 1 … 我正在尝试将大型Dask数据帧分布在多台计算机上,以便在数据帧上进行(后来的)分布式计算。我为此使用dask-distributed。
我看到的所有分发的示例/文档都是从网络资源(hdfs,s3等)填充初始数据负载,并且似乎没有将DAG优化扩展到负载部分(似乎假设网络负载是在另一个问题的答案上强调了这一点:Dask是否与HDFS通信以优化数据局部性?
但是,我可以看到一些我们想要的情况。例如,如果我们在此数据库的节点上共存有一个分片数据库+ dask worker,我们希望将仅来自本地分片的记录强制填充到本地dask worker中。从文档/示例来看,网络冲突似乎是必须承担的成本。是否可以强制从特定工作人员获取单个数据框的一部分?
我尝试过的替代方法是,尝试强制每个工作程序运行一个函数(迭代地提交给每个工作程序),其中该函数仅加载该计算机/分片本地的数据。这行得通,而且我有一堆具有相同列模式的最佳本地数据框-但是,现在我没有单个数据框,而是n个数据框。是否可以跨多台机器合并/融合数据帧,以便只有一个数据帧引用,但是部分对特定机器具有亲和力(在一定范围内,由任务DAG决定)?
我正在尝试使用dask read_csv读取多个CSV文件,每个文件约15 GB。在执行此任务时,dask会将特定的列解释为float,但是它具有一些字符串类型的值,后来当我尝试执行某些操作以表明它无法将字符串转换为float时,它将失败。因此,我使用dtype = str参数将所有列读取为字符串。现在,我想将特定的列转换为带有errors ='coerce'的数值,以便将那些包含字符串的记录转换为NaN值,其余的转换为正确的浮点数。您能告诉我如何使用dask来实现吗?
已经尝试过:astype转换
import dask.dataframe as dd
df = dd.read_csv("./*.csv", encoding='utf8',
assume_missing = True,
usecols =col_names.values.tolist(),
dtype=str)
df["mycol"] = df["mycol"].astype(float)
search_df = df.query('mycol >0').compute()
Run Code Online (Sandbox Code Playgroud)
ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.
+-----------------------------------+--------+----------+
| Column | Found | Expected |
+-----------------------------------+--------+----------+
| mycol | object | float64 |
+-----------------------------------+--------+----------+
The following columns also raised exceptions on conversion:
- mycol
ValueError("could not convert string to float: 'cliqz.com/tracking'")
Run Code Online (Sandbox Code Playgroud)
#Reproducible example
import dask.dataframe as dd
df = dd.read_csv("mydata.csv", encoding='utf8',
assume_missing = …Run Code Online (Sandbox Code Playgroud) 在我完成 DASK 代码后,我不断收到“distributed.utils_perf - 警告 - 完整的垃圾收集最近占用了 19% CPU 时间”警告消息。我正在使用 DASK 进行大型地震数据计算。计算完成后,我将计算出的数据写入磁盘。写入磁盘部分比计算花费的时间要长得多。在我将数据写入磁盘之前,我调用了 client.close(),我假设我已经完成了 DASK。但是“distributed.utils_perf - 警告 - 完整的垃圾收集最近占用了 19% 的 CPU 时间”不断出现。当我进行计算时,我收到了 3-4 次此警告消息。但是当我将数据写入磁盘时,我每 1 秒收到一次警告。我怎样才能摆脱这个烦人的警告?谢谢。
我正在使用 Dask,但有点困惑。
我运行下面的命令并得到这个,直到进程崩溃。
当出现故障时,它使用了所有 4 个 CPU 核心的 100%;
有人能给我建议吗?
distributed.nanny - WARNING - Restarting worker
Run Code Online (Sandbox Code Playgroud)
这是代码
import pandas as pd
import dask.dataframe as dd
import numpy as np
import time
from dask.distributed import Client
client = Client()
%time dahsn = dd.read_csv("US_Accidents_Dec19.csv")
dahsn.groupby('City').count().compute()
Run Code Online (Sandbox Code Playgroud) 我想从具有特定数量的工作人员的 python 启动本地集群,然后将客户端连接到它。
cluster = LocalCluster(n_workers=8, ip='127.0.0.1')
client = Client(cluster)
Run Code Online (Sandbox Code Playgroud)
但在此之前,我想检查是否存在现有的本地集群,例如由 dask-scheduler 命令启动。有没有办法做到这一点 ?
我有一个包含 500 万条记录的数据框。我试图通过利用 python 中的 dask 数据帧使用下面的代码来处理它
import dask.dataframe as dd
dask_df = dd.read_csv(fullPath)
............
for index , row in uniqueURLs.iterrows():
print(index);
results = dask_df[dask_df['URL'] == row['URL']]
count = results.size.compute();
Run Code Online (Sandbox Code Playgroud)
但我注意到 dask 在过滤数据帧方面非常有效,但不是在 .compute() 中。因此,如果我删除了计算结果大小的行,我的程序就会变得非常快。有人可以解释一下吗?我怎样才能让它更快?
使用 Dask 文档中的确切代码: https://jobqueue.dask.org/en/latest/examples.html
如果页面发生变化,代码如下:
from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed
cluster = SLURMCluster(memory='8g',
processes=1,
cores=2,
extra=['--resources ssdGB=200,GPU=2'])
cluster.scale(2)
client = Client(cluster)
def step_1_w_single_GPU(data):
return "Step 1 done for: %s" % data
def step_2_w_local_IO(data):
return "Step 2 done for: %s" % data
stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]
result_stage_2 = client.compute(stage_2,
resources={tuple(stage_1): {'GPU': 1},
tuple(stage_2): {'ssdGB': 100}})
Run Code Online (Sandbox Code Playgroud)
这会导致这样的错误:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback …Run Code Online (Sandbox Code Playgroud) dask-distributed ×10
dask ×8
python ×4
python-3.x ×3
dask-delayed ×1
dataframe ×1
pandas ×1
performance ×1