标签: dask

如何获取有关特定 Dask 任务的信息

我遇到了一个问题,我的分布式集群似乎“挂起” - 例如,任务停止处理,因此积压了未处理的任务,因此我正在寻找某种方法来帮助调试正在发生的事情。

Client一个processing方法可以告诉我每个工作人员当前正在运行哪些任务,但 AFAICS 这是有关对象上可用任务的唯一信息吗Client

我想要的是不仅能够查询处理任务,还能够查询所有任务,包括已处理、正在处理和出错的任务,并且每个任务都能够获取一些统计信息,例如submitted_time和 ,completion_time这将使我能够找出哪些任务正在阻塞集群。

这类似于ipyparallel.AsyncResult

一个很好的事情是能够获得args/kwargs任何给定的任务。这对于调试失败的任务特别有帮助。

目前是否有任何此功能可用,或者有什么方法可以获取我想要的信息?

关于如何调试问题的任何其他建议都将受到极大欢迎。

python distributed dask

3
推荐指数
1
解决办法
1591
查看次数

为什么 Pandas 中的多重处理比简单计算慢?

这与如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较有关?

再次考虑这个简单(但有趣)的例子:

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})

slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

master
Out[39]: 
                  original …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing pandas python-multiprocessing dask

3
推荐指数
1
解决办法
4141
查看次数

为什么 Dask 执行速度如此之慢,而多处理执行速度却如此之快?

为了更好地理解并行,我比较了一组不同的代码。

这是基本的(code_piece_1)。

for循环

import time

# setup
problem_size = 1e7
items = range(9)

# serial
def counter(num=0):
    junk = 0
    for i in range(int(problem_size)):
        junk += 1
        junk -= 1
    return num

def sum_list(args):
    print("sum_list fn:", args)
    return sum(args)

start = time.time()
summed = sum_list([counter(i) for i in items])
print(summed)
print('for loop {}s'.format(time.time() - start))
Run Code Online (Sandbox Code Playgroud)

此代码以串行方式(for 循环)运行时间消耗者并得到此结果

sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
for loop 8.7735116481781s
Run Code Online (Sandbox Code Playgroud)

多重处理

多处理风格是否可以被视为实现并行计算的一种方式?

我认为是的,因为医生是这么说的。

这是code_piece_2

import multiprocessing …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing parallelism-amdahl dask

3
推荐指数
1
解决办法
8466
查看次数

属性错误:模块“dask”没有属性“延迟”

使用Pycharm社区2018.1.4
Python 3.6
Dask 2.8.1

尝试在我的某些方法上实现 dask 延迟并收到错误

AttributeError: module 'dask' has no attribute 'delayed'.
Run Code Online (Sandbox Code Playgroud)

这显然不是真的,所以我想知道我做错了什么。我的实现结构如下:

AttributeError: module 'dask' has no attribute 'delayed'.
Run Code Online (Sandbox Code Playgroud)

本质上,我有一堆数据文件,它们是彼此独立的,所以我想并行运行它们,而不是通过 for 循环顺序运行,如果你取出 dask.delayed ,我就会这样做。
我是否从根本上错过了上述 dask 延迟实现中的任何内容?

dask dask-delayed

3
推荐指数
1
解决办法
9141
查看次数

BlazingSQL 和 dask 是什么关系?

我试图了解 BlazingSQL 是 dask 的竞争对手还是补充。

我有一些中等大小的数据 (10-50GB) 作为镶木地板文件保存在 Azure blob 存储中。

IIUC 我可以使用 SQL 语法使用 BlazingSQL 查询、加入、聚合、分组,但我也可以dask_cudf使用 python/ dataframe 语法将数据读入 CuDF并执行所有相同的操作。

所以,在我看来他们是直接的竞争对手?

使用 dask 的(其中一个)好处是它可以对分区进行操作,因此可以对大于 GPU 内存的数据集进行操作,而 BlazingSQL 仅限于适合 GPU 的内容,这是否正确?

为什么会选择使用 BlazingSQL 而不是 dask?

编辑:
文档讨论dask_cudf但实际存储已存档,说 dask 支持现在cudf本身。最好知道如何利用dask比 gpu 内存更大的数据集进行操作cudf

gpu parquet dask cudf

3
推荐指数
1
解决办法
288
查看次数

即使有块,Dask 也会出现内存不足的情况

我正在处理大型 CSV 文件,我需要制作笛卡尔积(合并操作)。我尝试用 Pandas 解决这个问题(您可以在此处检查 Panda 的代码和数据格式示例以了解同一问题) ,但由于内存错误而没有成功。现在,我正在尝试使用 Dask,它应该可以管理巨大的数据集,即使其大小大于可用 RAM。

首先我读了两个 CSV:

from dask import dataframe as dd

BLOCKSIZE = 64000000  # = 64 Mb chunks


df1_file_path = './mRNA_TCGA_breast.csv'
df2_file_path = './miRNA_TCGA_breast.csv'

# Gets Dataframes
df1 = dd.read_csv(
    df1_file_path,
    delimiter='\t',
    blocksize=BLOCKSIZE
)
first_column = df1.columns.values[0]
df1.set_index(first_column)
df2 = dd.read_csv(
    df2_file_path,
    delimiter='\t',
    blocksize=BLOCKSIZE
)
first_column = df2.columns.values[0]
df2.set_index(first_column)

# Filter common columns
common_columns = df1.columns.intersection(df2.columns)
df1 = df1[common_columns]
df2 = df2[common_columns]
Run Code Online (Sandbox Code Playgroud)

然后,我将操作存储在磁盘上以防止内存错误:

# Computes a Cartesian …
Run Code Online (Sandbox Code Playgroud)

python python-3.x dask

3
推荐指数
1
解决办法
5683
查看次数

使用 dask 将 parquet 文件拆分为较小的块

我正在尝试使用 DASK 和以下代码片段分割镶木地板文件

import dask.dataframe as pd
df = pd.read_parquet(dataset_path, chunksize="100MB")
df.repartition(partition_size="100MB")
pd.to_parquet(df,output_path)
Run Code Online (Sandbox Code Playgroud)

我的输入中只有一个物理文件,即 file.parquet

该脚本的输出也只有一个文件,即part.0.parquet。

根据partition_size和chunksize参数,我应该在输出中有多个文件

任何帮助,将不胜感激

python dask data-science

3
推荐指数
1
解决办法
7904
查看次数

如何使用 Numba + Dask 正确并行化通用代码

我是使用DaskNumba加速代码的新手,我希望这对用户来说是一个有价值的问题,可以帮助他们获得有关如何并行化代码的最佳实践的答案。我已经制作了一个包含 3 列的数据框的通用测试用例pandas

将在框架中的 3 个向量上实现通用函数,以表示数据分析中可能进行的转换类型:前两列进行平方、相加,然后取平方根,然后是布尔值将结果与第三列进行比较计算。

我实现了 4 个测试用例:(a) a pandasapply、(b) Dask、(c)Numba和 (d)Dask以及Numba在一起。

Numba效果很好。我所有的问题都与Dask. 以下是我遇到的问题:

  1. Dask,无论我制作什么大小的向量,都会更慢。我可能不完全理解如何以及何时计算数据帧的某些部分,或者如何使其正确并行化。它比常规申请慢。
  2. 如何正确使用Dask进行并行化?我将其设计为 4 个分区,并且有 2 个核心处理器,但是您实际上如何决定如何格式化它?
# Practice parallelizing
from dask import dataframe as dd
from numba import jit
import pandas as pd
import numpy as np
import time

# df is going to be the regular dataframe
df = pd.DataFrame(np.random.random(size=(1000000,3))*100,columns=['col1','col2','col3'])

# ddf is the dask dataframe …
Run Code Online (Sandbox Code Playgroud)

numpy pandas numba dask

3
推荐指数
1
解决办法
2965
查看次数

将 dask 数据帧写入镶木地板:“TypeError”

我正在尝试使用 Dask 编写镶木地板文件。目标是使用它的repartition功能,但似乎我无法写出一个简单的镶木地板文件,而不进行这repartition一步......

这是我用来从 pyarrow 创建 parquet 文件的代码,通过 dask 读回它,然后再次写入。

import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd

file = 'example.parquet'
file_res = 'example_res.parquet'

# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])

# Write 1st parquet file with pyarrow
table = pa.Table.from_pandas(df)
pq.write_table(table, file, version='1.0')

# Read it back with Dask, and write it again
dd_df = dd.read_parquet(file)
dd_df.to_parquet(file_res)
Run Code Online (Sandbox Code Playgroud)

最后一个写入步骤以 结束TypeError: expected …

python parquet dask

3
推荐指数
1
解决办法
2720
查看次数

并行列表过滤

我有一个需要根据某些条件进行过滤的项目列表。我想知道 Dask 是否可以并行执行此过滤,因为列表很长(几十万条记录)。

基本上,我需要做的是:

items = [
    {'type': 'dog', 'weight': 10},
    {'type': 'dog', 'weight': 20},
    {'type': 'cat', 'weight': 15},
    {'type': 'dog', 'weight': 30},
]

def item_is_valid(item):
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False
    # ...
    # elif for n conditions

    return item_is_valid

items_filtered = [item for item in items if item_is_valid(item)]

Run Code Online (Sandbox Code Playgroud)

通过 Dask,我实现了以下目标:

def item_is_valid_v2(item):
    """Return the whole item if valid."""
    item_is_valid = True

    if item['type']=='cat':
        item_is_valid = False
    elif item['weight']>20:
        item_is_valid = False …
Run Code Online (Sandbox Code Playgroud)

python dictionary bag dask dask-delayed

3
推荐指数
1
解决办法
154
查看次数