标签: dask

替换 dask 数据框分区

我可以将 dask 数据帧分区替换为我单独创建的另一个具有相同行数和相同结构的 dask 数据帧分区吗?如果是,怎么办?

是否可以使用不同的行数?

dask

2
推荐指数
1
解决办法
360
查看次数

如何让所有工人在dask中执行相同的任务?

我想让所有工人做同样的任务,像这样:

from dask import distributed
from distributed import Client,LocalCluster
import dask
import socket


def writer(filename,data):
    with open(filename,'w') as f:
        f.writelines(data)

def get_ip(x):
    return socket.gethostname()
    #writer('/data/1.txt',a)
client = Client('192.168.123.1:8786')

A=client.submit(get_ip, 0,workers=['w1','w2'], pure=False)
print(client.ncores(),
        client.scheduler_info()
#       dask.config.get('distributed')
     )
A.result()  
Run Code Online (Sandbox Code Playgroud)

我有 2 个工人,但只打印 1 个工人的主机名

dask dask-distributed

2
推荐指数
1
解决办法
952
查看次数

使用 Dask 从谷歌云存储读取镶木地板文件

我正在尝试使用 Dask 从 google 存储桶读取和写入。使用一堆csv文件可以,但不方便(速度较慢,无法压缩,无法仅读取某些列),所以我尝试使用该apache parquet格式。

看起来写得很好:

import dask.dataframe as dd
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_google_bucket/test/")
Run Code Online (Sandbox Code Playgroud)

但当我尝试读回来时

read_again_df = dd.read_parquet("gcs://my_google_bucket/test/") 
Run Code Online (Sandbox Code Playgroud)

我收到一个未实现的错误:

AttributeError                            Traceback (most recent call last)
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
    520     try:
--> 521         return fs._get_pyarrow_filesystem()
    522     except AttributeError:

AttributeError: 'DaskGCSFileSystem' object has no attribute '_get_pyarrow_filesystem'

During handling of the above exception, another exception occurred:

NotImplementedError                       Traceback (most recent call last)
<ipython-input-42-ef1fc41d04d5> in <module>()
----> 1 …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-storage parquet dask pyarrow

2
推荐指数
1
解决办法
5338
查看次数

来自延迟 zip csv 的 Dask 数据帧

我正在尝试从一组压缩的 CSV 文件创建 dask 数据框。阅读问题,似乎 dask 需要使用 dask.distributedelasted()

import glob
import dask.dataframe as dd
import zipfile
import pandas as pd 
from dask.delayed import delayed

#Create zip_dict with key-value pairs for .zip & .csv names
file_list = glob.glob('my_directory/zip_files/')
zip_dict = {}
for f in file_list:
    key = f.split('/')[5][:-4]
    zip_dict[key] = zipfile.ZipFile(f)
Run Code Online (Sandbox Code Playgroud)

zip_dict = {'log20160201': zipfile.ZipFile filename='/my_directory/zip_files/log20160201.zip' mode='r', 'log20160218': zipfile.ZipFile filename='/my_directory/zip_files/log20160218.zip' 的示例内容模式='r'}

# Create list of delayed pd.read_csv()    
d_rows = []
for k, v in zip_dict.items():

    row = delayed(pd.read_csv)(v.open(k+'.csv'),usecols=['time','cik'])
    d_rows.append(row)
    v.close()
Run Code Online (Sandbox Code Playgroud)

d_rows …

zip pandas dask dask-delayed

2
推荐指数
1
解决办法
3141
查看次数

将 dask 数据框中的列转换为 Doc2Vec 的 TaggedDocument

介绍

目前,我正在尝试将 dask 与 gensim 配合使用来进行 NLP 文档计算,并且在将我的语料库转换为“ TaggedDocument ”时遇到问题。

因为我尝试了很多不同的方法来解决这个问题,所以我将列出我的尝试。

每次处理这个问题的尝试都会遇到略有不同的困境。

首先是一些初步的假设。

数据

df.info()
<class 'dask.dataframe.core.DataFrame'>
Columns: 5 entries, claim_no to litigation
dtypes: object(2), int64(3)
Run Code Online (Sandbox Code Playgroud)
  claim_no   claim_txt I                                    CL ICC lit
0 8697278-17 battery comprising interior battery active ele... 106 2 0
Run Code Online (Sandbox Code Playgroud)

所需输出

>>tagged_document[0]
>>TaggedDocument(words=['battery', 'comprising', 'interior', 'battery', 'active', 'elements', 'battery', 'cell', 'casing', 'said', 'cell', 'casing', 'comprising', 'first', 'casing', 'element', 'first', 'contact', 'surface', 'second', 'casing', 'element', 'second', 'contact', 'surface', 'wherein', 'assembled', 'position', 'first', 'second', 'contact', 'surfaces', 'contact', 'first', 'second', …
Run Code Online (Sandbox Code Playgroud)

python gensim dask doc2vec

2
推荐指数
1
解决办法
1335
查看次数

如何从 Dask 调度程序获取仪表板地址

启动 dask 分布式本地集群时,您可以为 .dask 设置随机端口或地址dashboard_address

如果你后来得到了这个scheduler对象。有没有办法提取仪表板的地址。

我有这个:

cluster = dask.distributed.LocalCluster(scheduler_port=0,
                                        dashboard_address='localhost:0')
scheduler = dask.distributed.Client(cluster, set_as_default=False)
scheduler_info = scheduler.scheduler_info()
logger.info('Scheduler: %s', scheduler_info['address'])
logger.info('Status Port: %s', scheduler_info['services']['dashboard'])
Run Code Online (Sandbox Code Playgroud)

但这只能获取仪表板的端口,而不是仪表板的 IP。如果我将仪表板地址放在调度程序之外的单独 IP 上,似乎很难知道它绑定到哪个 IP。

dask dask-distributed

2
推荐指数
1
解决办法
2526
查看次数

读取大量 parquet 文件:read_parquet 与 from_delayed

我正在将大量(100 到 1000)的镶木地板文件读取到单个 dask 数据帧(单台机器,全部本地)中。我意识到

files = ['file1.parq', 'file2.parq', ...]
ddf = dd.read_parquet(files, engine='fastparquet')
ddf.groupby(['col_A', 'col_B']).value.sum().compute()
Run Code Online (Sandbox Code Playgroud)

效率比​

from dask import delayed
from fastparquet import ParquetFile

@delayed
def load_chunk(pth):
    return ParquetFile(pth).to_pandas()

ddf = dd.from_delayed([load_chunk(f) for f in files])
ddf.groupby(['col_A', 'col_B']).value.sum().compute()
Run Code Online (Sandbox Code Playgroud)

对于我的特定应用程序,第二种方法 ( from_delayed) 需要 6 秒才能完成,第一种方法需要 39 秒。在这种dd.read_parquet情况下,在工作人员开始做某事之前似乎就有很多开销,并且有相当多的transfer-...操作分散在任务流图中。我想了解这里发生了什么事。read_parquet该方法速度如此之慢的原因可能是什么?它与仅仅读取文件并将它们分成块有什么不同?

python pandas dask fastparquet

2
推荐指数
1
解决办法
2219
查看次数

如何使用多列作为函数输入将自定义函数应用于 dask 数据框中的组

我有一个非常大的数据框,正在使用dask处理。数据框大体上看起来像这样:

Col_1    Col_2   Bool_1   Bool_2
A        1       True     False
B        1       True     True
C        1       False    False
D        1       True     False
A        2       False    True
B        2       False    False
C        2       True     False
D        2       True     True
Run Code Online (Sandbox Code Playgroud)

但它有数百万行。

我在代码的这一点上想做的是计算中形成的每个组之间的Jaccard 距离Bool_1和。这是因为该程序的目的是为其中存在的每个组生成一行(每行都有多个统计数据,我仅报告相关列)。Bool_2Col_2Col_2

为此,我首先Col_2使用来对数据帧进行分组df.groupby("Col_2"),但随后我不知道如何继续。到目前为止,我尝试的每一次尝试都引发了错误。

1:我尝试定义一个函数compute_jacc_dist()并将其传递apply(compute_jacc_dist, axis=1)给组,但它在 args 和 kwargs 方面存在问题(尤其是轴,请参阅https://github.com/dask/dask/issues/1572,我还无法解决)。

2:我尝试使用它来计算和from dask_distance import jaccard之间的 J 距离,但它会产生奇怪的结果(即使没有交集,每个组也会返回 J=1)。Bool_1Bool_2 …

python group-by dataframe pandas dask

2
推荐指数
1
解决办法
2861
查看次数

如何将 Dask DataFrame 转换为字典列表?

我需要将 dask 数据帧转换为字典列表作为 API 端点的响应。我知道我可以将 dask 数据帧转换为 pandas,然后从那里我可以转换为字典,但最好将每个分区映射到一个字典,然后连接。

我尝试过的:

df = dd.read_csv(path, usecols=cols)

dd.compute(df.to_dict(orient='records'))
Run Code Online (Sandbox Code Playgroud)

我收到的错误:

AttributeError: 'DataFrame' object has no attribute 'to_dict'
Run Code Online (Sandbox Code Playgroud)

python parallel-processing dictionary pandas dask

2
推荐指数
1
解决办法
3953
查看次数

ValueError:样本不够大,无法包含至少一行数据。请增加“sample”中的字节数

我正在尝试读取 csv 文件(2GB)。由于大小很大,我使用了 dask,但它显示 ValueError:样本不够大,无法包含至少一行数据。sample请增加调用中的字节数read_csv/read_table 任何人都可以建议我如何解决它吗?谢谢

代码:

import dask.dataframe as dd
df= dd.read_csv('file1.csv')
Run Code Online (Sandbox Code Playgroud)

错误:

ValueError: Sample is not large enough to include at least one row of data. Please increase the number of bytes in `sample` in the call to `read_csv`/`read_table`
Run Code Online (Sandbox Code Playgroud)

python dask

2
推荐指数
1
解决办法
3910
查看次数