我遇到了一个问题,我的分布式集群似乎“挂起” - 例如,任务停止处理,因此积压了未处理的任务,因此我正在寻找某种方法来帮助调试正在发生的事情。
有Client一个processing方法可以告诉我每个工作人员当前正在运行哪些任务,但 AFAICS 这是有关对象上可用任务的唯一信息吗Client?
我想要的是不仅能够查询处理任务,还能够查询所有任务,包括已处理、正在处理和出错的任务,并且每个任务都能够获取一些统计信息,例如submitted_time和 ,completion_time这将使我能够找出哪些任务正在阻塞集群。
一个很好的事情是能够获得args/kwargs任何给定的任务。这对于调试失败的任务特别有帮助。
目前是否有任何此功能可用,或者有什么方法可以获取我想要的信息?
关于如何调试问题的任何其他建议都将受到极大欢迎。
这与如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较有关?
再次考虑这个简单(但有趣)的例子:
import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd
master= pd.DataFrame({'original':['this is a nice sentence',
'this is another one',
'stackoverflow is nice']})
slave= pd.DataFrame({'name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'stackoverflow is nice'],'my_value': [1,2,3,4,5]})
def fuzzy_score(str1, str2):
return fuzz.token_set_ratio(str1, str2)
def helper(orig_string, slave_df):
slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
#return my_value corresponding to the highest score
return slave_df.loc[slave_df.score.idxmax(),'my_value']
master
Out[39]:
original …Run Code Online (Sandbox Code Playgroud) 为了更好地理解并行,我比较了一组不同的代码。
这是基本的(code_piece_1)。
import time
# setup
problem_size = 1e7
items = range(9)
# serial
def counter(num=0):
junk = 0
for i in range(int(problem_size)):
junk += 1
junk -= 1
return num
def sum_list(args):
print("sum_list fn:", args)
return sum(args)
start = time.time()
summed = sum_list([counter(i) for i in items])
print(summed)
print('for loop {}s'.format(time.time() - start))
Run Code Online (Sandbox Code Playgroud)
此代码以串行方式(for 循环)运行时间消耗者并得到此结果
sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
for loop 8.7735116481781s
Run Code Online (Sandbox Code Playgroud)
多处理风格是否可以被视为实现并行计算的一种方式?
我认为是的,因为医生是这么说的。
这是code_piece_2
import multiprocessing …Run Code Online (Sandbox Code Playgroud) 使用Pycharm社区2018.1.4
Python 3.6
Dask 2.8.1
尝试在我的某些方法上实现 dask 延迟并收到错误
AttributeError: module 'dask' has no attribute 'delayed'.
Run Code Online (Sandbox Code Playgroud)
这显然不是真的,所以我想知道我做错了什么。我的实现结构如下:
AttributeError: module 'dask' has no attribute 'delayed'.
Run Code Online (Sandbox Code Playgroud)
本质上,我有一堆数据文件,它们是彼此独立的,所以我想并行运行它们,而不是通过 for 循环顺序运行,如果你取出 dask.delayed ,我就会这样做。
我是否从根本上错过了上述 dask 延迟实现中的任何内容?
我试图了解 BlazingSQL 是 dask 的竞争对手还是补充。
我有一些中等大小的数据 (10-50GB) 作为镶木地板文件保存在 Azure blob 存储中。
IIUC 我可以使用 SQL 语法使用 BlazingSQL 查询、加入、聚合、分组,但我也可以dask_cudf使用 python/ dataframe 语法将数据读入 CuDF并执行所有相同的操作。
所以,在我看来他们是直接的竞争对手?
使用 dask 的(其中一个)好处是它可以对分区进行操作,因此可以对大于 GPU 内存的数据集进行操作,而 BlazingSQL 仅限于适合 GPU 的内容,这是否正确?
为什么会选择使用 BlazingSQL 而不是 dask?
编辑:
文档讨论dask_cudf但实际存储库已存档,说 dask 支持现在cudf本身。最好知道如何利用dask比 gpu 内存更大的数据集进行操作cudf
我正在处理大型 CSV 文件,我需要制作笛卡尔积(合并操作)。我尝试用 Pandas 解决这个问题(您可以在此处检查 Panda 的代码和数据格式示例以了解同一问题) ,但由于内存错误而没有成功。现在,我正在尝试使用 Dask,它应该可以管理巨大的数据集,即使其大小大于可用 RAM。
首先我读了两个 CSV:
from dask import dataframe as dd
BLOCKSIZE = 64000000 # = 64 Mb chunks
df1_file_path = './mRNA_TCGA_breast.csv'
df2_file_path = './miRNA_TCGA_breast.csv'
# Gets Dataframes
df1 = dd.read_csv(
df1_file_path,
delimiter='\t',
blocksize=BLOCKSIZE
)
first_column = df1.columns.values[0]
df1.set_index(first_column)
df2 = dd.read_csv(
df2_file_path,
delimiter='\t',
blocksize=BLOCKSIZE
)
first_column = df2.columns.values[0]
df2.set_index(first_column)
# Filter common columns
common_columns = df1.columns.intersection(df2.columns)
df1 = df1[common_columns]
df2 = df2[common_columns]
Run Code Online (Sandbox Code Playgroud)
然后,我将操作存储在磁盘上以防止内存错误:
# Computes a Cartesian …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 DASK 和以下代码片段分割镶木地板文件
import dask.dataframe as pd
df = pd.read_parquet(dataset_path, chunksize="100MB")
df.repartition(partition_size="100MB")
pd.to_parquet(df,output_path)
Run Code Online (Sandbox Code Playgroud)
我的输入中只有一个物理文件,即 file.parquet
该脚本的输出也只有一个文件,即part.0.parquet。
根据partition_size和chunksize参数,我应该在输出中有多个文件
任何帮助,将不胜感激
我是使用Dask和Numba加速代码的新手,我希望这对用户来说是一个有价值的问题,可以帮助他们获得有关如何并行化代码的最佳实践的答案。我已经制作了一个包含 3 列的数据框的通用测试用例pandas。
将在框架中的 3 个向量上实现通用函数,以表示数据分析中可能进行的转换类型:前两列进行平方、相加,然后取平方根,然后是布尔值将结果与第三列进行比较计算。
我实现了 4 个测试用例:(a) a pandasapply、(b) Dask、(c)Numba和 (d)Dask以及Numba在一起。
Numba效果很好。我所有的问题都与Dask. 以下是我遇到的问题:
Dask,无论我制作什么大小的向量,都会更慢。我可能不完全理解如何以及何时计算数据帧的某些部分,或者如何使其正确并行化。它比常规申请慢。# Practice parallelizing
from dask import dataframe as dd
from numba import jit
import pandas as pd
import numpy as np
import time
# df is going to be the regular dataframe
df = pd.DataFrame(np.random.random(size=(1000000,3))*100,columns=['col1','col2','col3'])
# ddf is the dask dataframe …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 Dask 编写镶木地板文件。目标是使用它的repartition功能,但似乎我无法写出一个简单的镶木地板文件,而不进行这repartition一步......
这是我用来从 pyarrow 创建 parquet 文件的代码,通过 dask 读回它,然后再次写入。
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
import dask.dataframe as dd
file = 'example.parquet'
file_res = 'example_res.parquet'
# Generate a random df
df = pd.DataFrame(np.random.randint(100,size=(100000, 20)),columns=['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T'])
# Write 1st parquet file with pyarrow
table = pa.Table.from_pandas(df)
pq.write_table(table, file, version='1.0')
# Read it back with Dask, and write it again
dd_df = dd.read_parquet(file)
dd_df.to_parquet(file_res)
Run Code Online (Sandbox Code Playgroud)
最后一个写入步骤以 结束TypeError: expected …
我有一个需要根据某些条件进行过滤的项目列表。我想知道 Dask 是否可以并行执行此过滤,因为列表很长(几十万条记录)。
基本上,我需要做的是:
items = [
{'type': 'dog', 'weight': 10},
{'type': 'dog', 'weight': 20},
{'type': 'cat', 'weight': 15},
{'type': 'dog', 'weight': 30},
]
def item_is_valid(item):
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False
# ...
# elif for n conditions
return item_is_valid
items_filtered = [item for item in items if item_is_valid(item)]
Run Code Online (Sandbox Code Playgroud)
通过 Dask,我实现了以下目标:
def item_is_valid_v2(item):
"""Return the whole item if valid."""
item_is_valid = True
if item['type']=='cat':
item_is_valid = False
elif item['weight']>20:
item_is_valid = False …Run Code Online (Sandbox Code Playgroud) dask ×10
python ×7
dask-delayed ×2
pandas ×2
parquet ×2
bag ×1
cudf ×1
data-science ×1
dictionary ×1
distributed ×1
gpu ×1
numba ×1
numpy ×1
python-3.x ×1