标签: dask

Dask DataFrame .head() 索引后非常慢

不可重现,但有人可以填写为什么 .head() 调用在索引后会大大减慢吗?

import dask.dataframe as dd
df = dd.read_parquet("Filepath")
df.head() # takes 10 seconds

df = df.set_index('id')

df.head() # takes 10 minutes +
Run Code Online (Sandbox Code Playgroud)

dask

4
推荐指数
1
解决办法
1916
查看次数

pandas DataFrame 中行的高效成对比较

我目前正在处理一个较小的数据集(大约 900 万行)。不幸的是,大多数条目都是字符串,即使强制类别,框架在内存中也只有几 GB。

我想做的是将每一行与其他行进行比较,并对内容进行直接比较。例如,给定

   A   B     C      D
0 cat blue  old Saturday
1 dog red   old Saturday
Run Code Online (Sandbox Code Playgroud)

我想计算

      d_A   d_B   d_C   d_D
0, 0  True  True  True  True
0, 1  False False True  True
1, 0  False False True  True
1, 1  True  True  True  True
Run Code Online (Sandbox Code Playgroud)

显然,组合爆炸将排除每个记录与其他记录的比较。因此我们可以通过应用 groupby 来使用阻塞,比如在 A 列上。

我的问题是,有没有一种方法可以在 pandas 或 dask 中执行此操作,比以下序列更快:

  1. 按索引分组
  2. 将每个组外连接到自身以生成对
  3. dataframe.apply 比较函数在每行对上

作为参考,假设我可以使用大量的核心(数百个)和大约 200G 的内存。

python pandas dask pandas-groupby

4
推荐指数
1
解决办法
5552
查看次数

将 numpy 数组转换为 dask 数据框列?

我有一个 numpy 数组,我想将其添加为现有 dask 数据框中的列。

enc = LabelEncoder()
nparr = enc.fit_transform(X[['url']])
Run Code Online (Sandbox Code Playgroud)

我有 dask 数据帧类型的 ddf。

ddf['nurl'] = nparr   ???
Run Code Online (Sandbox Code Playgroud)

请问有什么优雅的方法可以实现上述目标吗?

Python PANDAS:从 pandas/numpy 转换为 dask 数据帧/数组这并不能解决我的问题,因为我希望将 numpy 数组转换为现有的 dask 数据帧。

python numpy dask numpy-ndarray

4
推荐指数
1
解决办法
5363
查看次数

使用 dask 合并列

我目前有一个用 pandas 编写的简单脚本,我想将其转换为 dask 数据帧。
在此脚本中,我正在对用户指定列上的两个数据帧执行合并,并尝试将其转换为 dask。

def merge_dfs(df1, df2, columns):
    merged = pd.merge(df1, df2, on=columns, how='inner')
...
Run Code Online (Sandbox Code Playgroud)

如何更改此行以匹配 dask 数据帧?

python dataframe pandas dask dask-dataframe

4
推荐指数
1
解决办法
4538
查看次数

在 dask 中搜索行后获取列值

我有一个 pandas 数据框,我使用from_pandasdask 函数将其转换为 dask 数据框。它有 3 列col1,即col2、 和col3

现在我正在使用我正在搜索的daskdf[(daskdf.col1 == v1) & (daskdf.col2 == v2)]wherev1v2are 值来搜索特定行。col3但是当我尝试获取using的值时,daskdf[(daskdf.col1 == v1) & (daskdf.col2 == v2)]['col3']它给了我一个 dask 系列结构而不是列值。

在熊猫中我可以做到pandasdf[(pandasdf.col1 == v1) & (pandasdf.col2 == v2)]['col3'].tolist()。我如何获取这里的值col3

python dataframe pandas dask dask-dataframe

4
推荐指数
1
解决办法
1754
查看次数

Dask多阶段资源设置导致Failed to Serialize错误

使用 Dask 文档中的确切代码: https://jobqueue.dask.org/en/latest/examples.html

如果页面发生变化,代码如下:

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed

cluster = SLURMCluster(memory='8g',
                       processes=1,
                       cores=2,
                       extra=['--resources ssdGB=200,GPU=2'])

cluster.scale(2)
client = Client(cluster)

def step_1_w_single_GPU(data):
    return "Step 1 done for: %s" % data


def step_2_w_local_IO(data):
    return "Step 2 done for: %s" % data


stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]

result_stage_2 = client.compute(stage_2,
                                resources={tuple(stage_1): {'GPU': 1},
                                           tuple(stage_2): {'ssdGB': 100}})
Run Code Online (Sandbox Code Playgroud)

这会导致这样的错误:

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback …
Run Code Online (Sandbox Code Playgroud)

python python-3.x dask dask-delayed dask-distributed

4
推荐指数
1
解决办法
244
查看次数

如何从 parquet 文件中删除 __null_dask_index ?

我正在使用Dask将 df 写入Parquet文件:

df.to_parquet(file, compression='snappy', write_metadata_file=False,\
              engine='pyarrow', index=None)
Run Code Online (Sandbox Code Playgroud)

我需要在在线镶木地板查看器中显示文件的内容,

显示的列是:

Column1  Column2  Column3  __null_dask_index__
Run Code Online (Sandbox Code Playgroud)

如何删除该__null_dask_index__列?

python dataframe parquet dask dask-dataframe

4
推荐指数
1
解决办法
1182
查看次数

计算dask.dataframe中某些值的出现

我有一个这样的数据框:

df.head()
   day      time  resource_record  
0   27  00:00:00             AAAA  
1   27  00:00:00                A  
2   27  00:00:00             AAAA  
3   27  00:00:01                A  
4   27  00:00:02                A  
Run Code Online (Sandbox Code Playgroud)

并想找出某些resource_records事物的存在次数。

我的第一个尝试是使用by返回的Series value_counts(),这看起来不错,但是由于以后没有drop()在中实现,因此我不允许以后再排除一些标签dask.Series

因此,我尝试不打印不需要的标签:

for row in df.resource_record.value_counts().iteritems():
    if row[0] in ['AAAA']:
        continue
    print('\t{0}\t{1}'.format(row[1], row[0]))
Run Code Online (Sandbox Code Playgroud)

哪个工作正常,但是如果我想进一步处理此数据并真的希望“清除”该怎么办。因此,我对文档进行了更多搜索并找到了mask(),但这也有些笨拙:

records = df.resource_record.mask(df.resource_record.map(lambda x: x in ['AAAA'])).value_counts()
Run Code Online (Sandbox Code Playgroud)

我正在寻找一种方法,该方法将允许我仅对单个值进行计数,但count()对所有非NaN值进行计数。

然后我找到了str.contains(),但是我不知道如何处理使用以下代码返回的未记录的Scalar类型:

print(df.resource_record.str.contains('A').sum())
Run Code Online (Sandbox Code Playgroud)

输出:

dd.Scalar<series-..., dtype=int64>
Run Code Online (Sandbox Code Playgroud)

但是即使在查看了Scalar的代码之后,dask/dataframe/core.py我也没有找到获得其价值的方法。

您如何有效地计算数据框中某组值的出现?

python dask

3
推荐指数
1
解决办法
2602
查看次数

将新列追加到dask数据框

这是对dash洗改数据的跟进问题。

我有一个现有的dask数据框df,希望在其中执行以下操作:

df['rand_index'] = np.random.permutation(len(df))
Run Code Online (Sandbox Code Playgroud)

但是,这会产生错误Column assignment doesn't support type ndarray。我试图使用df.assign(rand_index = np.random.permutation(len(df))它给出相同的错误。

这是一个最小的(不是)工作示例:

import pandas as pd
import dask.dataframe as dd
import numpy as np

df = dd.from_pandas(pd.DataFrame({'A':[1,2,3]*10, 'B':[3,2,1]*10}), npartitions=10)
df['rand_index'] = np.random.permutation(len(df))
Run Code Online (Sandbox Code Playgroud)

注意:

前面的问题提到了使用,df = df.map_partitions(add_random_column_to_pandas_dataframe, ...)但是我不确定这是否与该特定情况有关。

编辑1

我尝试过 df['rand_index'] = dd.from_array(np.random.permutation(len_df)),执行没有问题。当我检查时df.head(),似乎已经创建了新列。但是,当我看时df.tail()rand_index是一堆NaNs。

实际上,只是为了确认我检查了df.rand_index.max().compute()哪个结果小于len(df)-1。所以这可能df.map_partitions是发挥作用的地方,因为我怀疑这是将dask分区的问题。在我的特定情况下,我有80个分区(不涉及示例情况)。

python dask

3
推荐指数
2
解决办法
3982
查看次数

堆叠从Xarray生成的Dask数组的有效方法

因此,我正在尝试读取大量包含水文数据的相对较大的netCDF文件。NetCDF文件全部如下所示:

<xarray.Dataset>
Dimensions:         (feature_id: 2729077, reference_time: 1, time: 1)
Coordinates:
  * time            (time) datetime64[ns] 1993-01-11T21:00:00
  * reference_time  (reference_time) datetime64[ns] 1993-01-01
  * feature_id      (feature_id) int32 101 179 181 183 185 843 845 847 849 ...
Data variables:
    streamflow      (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    q_lateral       (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    velocity        (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    qSfcLatRunoff   (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    qBucket         (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
    qBtmVertRunoff  (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
Attributes:
    featureType:                timeSeries
    proj4:                      +proj=longlat +datum=NAD83 +no_defs
    model_initialization_time:  1993-01-01_00:00:00
    station_dimension:          feature_id
    model_output_valid_time:    1993-01-11_21:00:00
    stream_order_output: …
Run Code Online (Sandbox Code Playgroud)

python netcdf dask python-xarray

3
推荐指数
1
解决办法
285
查看次数