标签: dask

在dask中迭代GroupBy对象

是否有可能迭代一个dask GroupBy对象来访问底层数据帧?我试过了:

import dask.dataframe as dd
import pandas as pd
pdf = pd.DataFrame({'A':[1,2,3,4,5], 'B':['1','1','a','a','a']})
ddf = dd.from_pandas(pdf, npartitions = 3)
groups = ddf.groupby('B')
for name, df in groups:
    print(name)
Run Code Online (Sandbox Code Playgroud)

但是,这会导致错误: KeyError: 'Column not found: 0'

更一般地说,除了apply方法之外,dask GroupBy对象允许哪种交互?

python pandas dask

7
推荐指数
2
解决办法
2291
查看次数

如何将压缩(gz)CSV文件读入dask Dataframe?

有没有办法读取通过gz压缩到dask数据帧的.csv文件?

我直接尝试过

import dask.dataframe as dd
df = dd.read_csv("Data.gz" )
Run Code Online (Sandbox Code Playgroud)

但得到一个unicode错误(可能是因为它正在解释压缩的字节)有一个"compression"参数但compression = "gz"不起作用,到目前为止我找不到任何文档.

使用pandas我可以直接读取文件而不会产生问题,除了结果炸毁了我的记忆;-)但是如果我限制行数它可以正常工作.

import pandas.Dataframe as pd
df = pd.read_csv("Data.gz", ncols=100)
Run Code Online (Sandbox Code Playgroud)

python csv pandas dask

7
推荐指数
2
解决办法
5821
查看次数

Dask.distributed的扩展限制是多少?

Dask.distributed部署有数百个工作节点的传闻吗?分布是否意味着扩展到这个大小的集群?

python distributed-computing dask

7
推荐指数
1
解决办法
438
查看次数

基于2个现有列的值将新列分配(添加)到dask数据帧 - 涉及条件语句

我想基于2个现有列的值向现有的dask数据帧添加一个新列,并涉及一个用于检查空值的条件语句:

DataFrame定义

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [0.2, "", 0.345, 0.40, 0.15]})
ddf = dd.from_pandas(df1, npartitions=2)
Run Code Online (Sandbox Code Playgroud)

方法-1尝试过

def funcUpdate(row):
    if row['y'].isnull():
        return row['y']
    else:
        return  round((1 + row['x'])/(1+ 1/row['y']),4)

ddf = ddf.assign(z= ddf.apply(funcUpdate, axis=1 , meta = ddf))
Run Code Online (Sandbox Code Playgroud)

它给出了一个错误:

TypeError: Column assignment doesn't support type DataFrame
Run Code Online (Sandbox Code Playgroud)

方法2

ddf = ddf.assign(z = ddf.apply(lambda col: col.y if col.y.isnull() else  round((1 + col.x)/(1+ 1/col.y),4),axis = 1, meta = ddf))
Run Code Online (Sandbox Code Playgroud)

知道应该怎么做吗?

python pandas dask

7
推荐指数
1
解决办法
6833
查看次数

有没有办法在dask中获得每组最大的项目?

我有以下数据集:

location  category    percent
A         5           100.0
B         3           100.0
C         2            50.0
          4            13.0
D         2            75.0
          3            59.0
          4            13.0
          5             4.0
Run Code Online (Sandbox Code Playgroud)

而我正试图在按位置分组的数据框中获取最大类别的项目.即如果我想要每组的前2个最大百分比,输出应该是:

location  category    percent
A         5           100.0
B         3           100.0
C         2            50.0
          4            13.0
D         2            75.0
          3            59.0
Run Code Online (Sandbox Code Playgroud)

看起来在熊猫中这是相对直接的使用,pandas.core.groupby.SeriesGroupBy.nlargest但是dask没有nlargestgroupby的功能.一直在玩,apply但似乎无法让它正常工作.

df.groupby(['location'].apply(lambda x: x['percent'].nlargest(2)).compute()

但我只是得到错误 ValueError: Wrong number of items passed 0, placement implies 8

grouping top-n pandas dask

7
推荐指数
1
解决办法
1261
查看次数

如何在 Databricks 上使用 Dask

我想在 Databricks 上使用 Dask。这应该是可能的(我不明白为什么不可以)。如果我导入它,会发生两种情况之一,要么我得到一个ImportError,但是当我安装distributed来解决这个问题时,DataBricks 只是说Cancelled没有抛出任何错误。

dask databricks dask-distributed azure-databricks

6
推荐指数
3
解决办法
5444
查看次数

如何在Dask DataFrame中创建唯一索引?

想象一下我有一个Dask来自read_csv或以其他方式创建的 DataFrame。

如何为 dask 数据框创建唯一索引?

笔记:

reset_index在每个分区中构建单调升序索引。这意味着分区 1 为 (0,1,2,3,4,5,... ),分区 2 为 (0,1,2,3,4,5,... ),(0,1,2 ,3,4,5,... ) 对于分区 3 等等。

我想要数据帧中的每一行(跨所有分区)都有一个唯一的索引。

python dataframe pandas dask dask-dataframe

6
推荐指数
1
解决办法
2185
查看次数

使用 Array<Map<String,String>> 列读取 Parquet 文件

我正在使用 Dask 读取 PySpark 生成的 Parquet 文件,其中一列是字典列表(即array<map<string,string>>')。df 的一个例子是:

import pandas as pd

df = pd.DataFrame.from_records([ 
    (1, [{'job_id': 1, 'started': '2019-07-04'}, {'job_id': 2, 'started': '2019-05-04'}], 100), 
    (5, [{'job_id': 3, 'started': '2015-06-04'}, {'job_id': 9, 'started': '2019-02-02'}], 540)], 
    columns=['uid', 'job_history', 'latency'] 
) 
Run Code Online (Sandbox Code Playgroud)

当使用 时engine='fastparquet,Dask 可以很好地读取所有其他列,但会None为具有复杂类型的列返回 s 列。当我设置时engine='pyarrow',出现以下异常:

ArrowNotImplementedError: lists with structs are not supported.
Run Code Online (Sandbox Code Playgroud)

许多谷歌搜索已经明确表明,现在并不真正支持读取带有嵌套数组的列,而且我不完全确定处理此问题的最佳方法是什么。我想我的选择是:

  • 一些如何告诉 dask/fastparquet 使用标准库解析列json。该模式很简单,如果可能的话就可以完成这项工作
  • 看看我是否可以重新运行生成输出的 Spark 作业并将其另存为其他内容,尽管这几乎不是一个可接受的解决方案,因为我的公司到处都使用镶木地板
  • 将映射的键转换为列,并使用 dtype 将数据分解为多个列,list并注意这些列中的数据通过索引相互关联/映射(例如,0这些键/列中的 idx 中的元素全部来自相同来源)。这会起作用,但坦率地说,让我心碎:(

我很想听听其他人如何克服这个限制。我的公司经常在其镶木地板中使用嵌套数组,因此我不想放弃使用 Dask。

python dask python-3.7 fastparquet pyarrow

6
推荐指数
1
解决办法
1万
查看次数

沿 XArray 的时间维度应用函数

我有一个图像堆栈存储在尺寸为时间、x、y 的 XArray DataArray 中,我想在其中沿每个像素的时间轴应用自定义函数,以便输出是尺寸为 x、y 的单个图像。

我尝试过: apply_ufunc 但该函数失败,说明我需要首先将数据加载到 RAM 中(即无法使用 Dask 数组)。理想情况下,我希望在内部将 DataArray 保留为 Dask 数组,因为不可能将整个堆栈加载到 RAM 中。确切的错误消息是:

ValueError: apply_ufunc 在参数上遇到 dask 数组,但尚未启用 dask 数组的处理。设置参数或首先使用或dask将数据加载到内存中.load().compute()

我的代码目前如下所示:

import numpy as np
import xarray as xr
import pandas as pd 

def special_mean(x, drop_min=False):
    s = np.sum(x)
    n = len(x)
    if drop_min:
    s = s - x.min()
    n -= 1
    return s/n

times = pd.date_range('2019-01-01', '2019-01-10', name='time')

data = xr.DataArray(np.random.rand(10, 8, 8), dims=["time", "y", "x"], coords={'time': times}) …
Run Code Online (Sandbox Code Playgroud)

dask python-xarray

6
推荐指数
1
解决办法
8620
查看次数

指定 das 的仪表板端口

使用dask-jobqueue创建 dask 集群时是否可以手动指定仪表板的端口?当使用8787时,它会随机选择一个不同的端口,这意味着每次都需要设置不同的隧道。

from dask_jobqueue import PBSCluster
cluster = PBSCluster() # ideally here dashboard_port=
cluster.scale(10)         

from dask.distributed import Client
client = Client(cluster)  # Connect this local process to remote workers
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed

6
推荐指数
1
解决办法
1730
查看次数