是否有可能迭代一个dask GroupBy对象来访问底层数据帧?我试过了:
import dask.dataframe as dd
import pandas as pd
pdf = pd.DataFrame({'A':[1,2,3,4,5], 'B':['1','1','a','a','a']})
ddf = dd.from_pandas(pdf, npartitions = 3)
groups = ddf.groupby('B')
for name, df in groups:
print(name)
Run Code Online (Sandbox Code Playgroud)
但是,这会导致错误: KeyError: 'Column not found: 0'
更一般地说,除了apply方法之外,dask GroupBy对象允许哪种交互?
有没有办法读取通过gz压缩到dask数据帧的.csv文件?
我直接尝试过
import dask.dataframe as dd
df = dd.read_csv("Data.gz" )
Run Code Online (Sandbox Code Playgroud)
但得到一个unicode错误(可能是因为它正在解释压缩的字节)有一个"compression"参数但compression = "gz"不起作用,到目前为止我找不到任何文档.
使用pandas我可以直接读取文件而不会产生问题,除了结果炸毁了我的记忆;-)但是如果我限制行数它可以正常工作.
import pandas.Dataframe as pd
df = pd.read_csv("Data.gz", ncols=100)
Run Code Online (Sandbox Code Playgroud) Dask.distributed部署有数百个工作节点的传闻吗?分布是否意味着扩展到这个大小的集群?
我想基于2个现有列的值向现有的dask数据帧添加一个新列,并涉及一个用于检查空值的条件语句:
DataFrame定义
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [0.2, "", 0.345, 0.40, 0.15]})
ddf = dd.from_pandas(df1, npartitions=2)
Run Code Online (Sandbox Code Playgroud)
方法-1尝试过
def funcUpdate(row):
if row['y'].isnull():
return row['y']
else:
return round((1 + row['x'])/(1+ 1/row['y']),4)
ddf = ddf.assign(z= ddf.apply(funcUpdate, axis=1 , meta = ddf))
Run Code Online (Sandbox Code Playgroud)
它给出了一个错误:
TypeError: Column assignment doesn't support type DataFrame
Run Code Online (Sandbox Code Playgroud)
方法2
ddf = ddf.assign(z = ddf.apply(lambda col: col.y if col.y.isnull() else round((1 + col.x)/(1+ 1/col.y),4),axis = 1, meta = ddf))
Run Code Online (Sandbox Code Playgroud)
知道应该怎么做吗?
我有以下数据集:
location category percent
A 5 100.0
B 3 100.0
C 2 50.0
4 13.0
D 2 75.0
3 59.0
4 13.0
5 4.0
Run Code Online (Sandbox Code Playgroud)
而我正试图在按位置分组的数据框中获取最大类别的项目.即如果我想要每组的前2个最大百分比,输出应该是:
location category percent
A 5 100.0
B 3 100.0
C 2 50.0
4 13.0
D 2 75.0
3 59.0
Run Code Online (Sandbox Code Playgroud)
看起来在熊猫中这是相对直接的使用,pandas.core.groupby.SeriesGroupBy.nlargest但是dask没有nlargestgroupby的功能.一直在玩,apply但似乎无法让它正常工作.
df.groupby(['location'].apply(lambda x: x['percent'].nlargest(2)).compute()
但我只是得到错误 ValueError: Wrong number of items passed 0, placement implies 8
我想在 Databricks 上使用 Dask。这应该是可能的(我不明白为什么不可以)。如果我导入它,会发生两种情况之一,要么我得到一个ImportError,但是当我安装distributed来解决这个问题时,DataBricks 只是说Cancelled没有抛出任何错误。
想象一下我有一个Dask来自read_csv或以其他方式创建的 DataFrame。
如何为 dask 数据框创建唯一索引?
笔记:
reset_index在每个分区中构建单调升序索引。这意味着分区 1 为 (0,1,2,3,4,5,... ),分区 2 为 (0,1,2,3,4,5,... ),(0,1,2 ,3,4,5,... ) 对于分区 3 等等。
我想要数据帧中的每一行(跨所有分区)都有一个唯一的索引。
我正在使用 Dask 读取 PySpark 生成的 Parquet 文件,其中一列是字典列表(即array<map<string,string>>')。df 的一个例子是:
import pandas as pd
df = pd.DataFrame.from_records([
(1, [{'job_id': 1, 'started': '2019-07-04'}, {'job_id': 2, 'started': '2019-05-04'}], 100),
(5, [{'job_id': 3, 'started': '2015-06-04'}, {'job_id': 9, 'started': '2019-02-02'}], 540)],
columns=['uid', 'job_history', 'latency']
)
Run Code Online (Sandbox Code Playgroud)
当使用 时engine='fastparquet,Dask 可以很好地读取所有其他列,但会None为具有复杂类型的列返回 s 列。当我设置时engine='pyarrow',出现以下异常:
ArrowNotImplementedError: lists with structs are not supported.
Run Code Online (Sandbox Code Playgroud)
许多谷歌搜索已经明确表明,现在并不真正支持读取带有嵌套数组的列,而且我不完全确定处理此问题的最佳方法是什么。我想我的选择是:
json。该模式很简单,如果可能的话就可以完成这项工作list并注意这些列中的数据通过索引相互关联/映射(例如,0这些键/列中的 idx 中的元素全部来自相同来源)。这会起作用,但坦率地说,让我心碎:(我很想听听其他人如何克服这个限制。我的公司经常在其镶木地板中使用嵌套数组,因此我不想放弃使用 Dask。
我有一个图像堆栈存储在尺寸为时间、x、y 的 XArray DataArray 中,我想在其中沿每个像素的时间轴应用自定义函数,以便输出是尺寸为 x、y 的单个图像。
我尝试过: apply_ufunc 但该函数失败,说明我需要首先将数据加载到 RAM 中(即无法使用 Dask 数组)。理想情况下,我希望在内部将 DataArray 保留为 Dask 数组,因为不可能将整个堆栈加载到 RAM 中。确切的错误消息是:
ValueError: apply_ufunc 在参数上遇到 dask 数组,但尚未启用 dask 数组的处理。设置参数或首先使用或
dask将数据加载到内存中.load().compute()
我的代码目前如下所示:
import numpy as np
import xarray as xr
import pandas as pd
def special_mean(x, drop_min=False):
s = np.sum(x)
n = len(x)
if drop_min:
s = s - x.min()
n -= 1
return s/n
times = pd.date_range('2019-01-01', '2019-01-10', name='time')
data = xr.DataArray(np.random.rand(10, 8, 8), dims=["time", "y", "x"], coords={'time': times}) …Run Code Online (Sandbox Code Playgroud) 使用dask-jobqueue创建 dask 集群时是否可以手动指定仪表板的端口?当使用8787时,它会随机选择一个不同的端口,这意味着每次都需要设置不同的隧道。
from dask_jobqueue import PBSCluster
cluster = PBSCluster() # ideally here dashboard_port=
cluster.scale(10)
from dask.distributed import Client
client = Client(cluster) # Connect this local process to remote workers
Run Code Online (Sandbox Code Playgroud) dask ×10
python ×7
pandas ×5
csv ×1
databricks ×1
dataframe ×1
fastparquet ×1
grouping ×1
pyarrow ×1
python-3.7 ×1
top-n ×1