标签: dask

Dask 是否支持自定义图形中具有多个输出的函数?

Dask自定义图形API似乎只支持返回一个输出键/值的函数。

例如,以下依赖项无法轻松表示为 Dask 图:

    B -> D
   /      \
A-         -> F
   \      /
    C -> E
Run Code Online (Sandbox Code Playgroud)

这可以通过将元组存储在“复合”键(例如在这种情况下为“B_C”)下,然后将其拆分getitem()或类似来解决。但是,这会导致执行效率低下(例如不必要的序列化)并降低 DAG 可视化的清晰度。

有没有更好的方法,或者目前不支持?

python dask

7
推荐指数
1
解决办法
931
查看次数

Dask DataFrame 聚合到中位数

我正在尝试将 dask 数据框聚合到一组指标,包括中位数,但看起来不支持该中位数。有机会汇总并获得中位数吗?

st_agg = df.groupby(['start station id', 'end station id']).agg({'usertype':'count', 'tripduration':'median'})

>>> ValueError: unknown aggregate median
Run Code Online (Sandbox Code Playgroud)

python dask

7
推荐指数
1
解决办法
2240
查看次数

Dask:创建严格递增索引

正如有据可查的那样,Dask 在reset_index调用时会在每个分区的基础上创建一个严格递增的索引,从而导致整个集合上出现重复的索引。在 Dask 中创建严格递增索引(不必是连续的)在整个集合上的最佳方法(例如计算最快)是什么?我希望map_partitions能传递分区号,但我认为不会。谢谢。

编辑

谢谢@MRocklin,我已经做到了这一点,但我需要一些关于如何将我的系列与原始数据框重新组合的帮助。

def create_increasing_index(ddf:dd.DataFrame):
    mps = int(len(ddf) / ddf.npartitions + 1000)
    values = ddf.index.values

    def do(x, max_partition_size, block_id=None):
        length = len(x)
        if length == 0:
            raise ValueError("Does not work with empty partitions. Consider using dask.repartition.")

        start = block_id[0] * max_partition_size
        return da.arange(start, start+length, chunks=1)

    series = values.map_blocks(do, max_partition_size=mps, dtype=np.int64)
    ddf2 = dd.concat([ddf, dd.from_array(series)], axis=1)
    return ddf2
Run Code Online (Sandbox Code Playgroud)

我收到错误“ValueError:无法将 DataFrame 与指定 axis=1 的未知除法连接”。有没有比使用 dd.concat 更好的方法?谢谢。

再次编辑

实际上,就我的目的而言(以及我测试的数据量 - 只有几 GB),cumsum 已经足够快了。当这变得太慢时我会重新访问!

python python-3.x dask

7
推荐指数
1
解决办法
1452
查看次数

如何使用 Dask Pivot_table?

我正在尝试在 Dask 上使用 Pivot_table 和以下数据框:

    date    store_nbr   item_nbr    unit_sales  year    month
0   2013-01-01  25       103665      7.0        2013      1
1   2013-01-01  25       105574      1.0        2013      1
2   2013-01-01  25       105575      2.0        2013      1
3   2013-01-01  25       108079      1.0        2013      1
4   2013-01-01  25       108701      1.0        2013      1
Run Code Online (Sandbox Code Playgroud)

当我尝试将 pivot_table 如下:

ddf.pivot_table(values='unit_sales', index={'store_nbr','item_nbr'}, 
                                  columns={'year','month'}, aggfunc={'mean','sum'})
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

ValueError: 'index' must be the name of an existing column
Run Code Online (Sandbox Code Playgroud)

如果我只在索引和列参数上使用一个值,如下所示:

df.pivot_table(values='unit_sales', index='store_nbr', 
                                  columns='year', aggfunc={'sum'})
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

ValueError: 'columns' must be category dtype
Run Code Online (Sandbox Code Playgroud)

pivot-table dataframe dask

7
推荐指数
1
解决办法
5774
查看次数

pandas 数据帧上的 s3fs gzip 压缩

我正在尝试使用s3fs库和 pandas在 S3 上将数据帧编写为 CSV 文件。尽管有文档,但恐怕 gzip 压缩参数不适用于 s3fs。

def DfTos3Csv (df,file):
    with fs.open(file,'wb') as f:
       df.to_csv(f, compression='gzip', index=False)
Run Code Online (Sandbox Code Playgroud)

此代码将数据帧保存为 S3 中的新对象,但保存为纯 CSV 而非 gzip 格式。另一方面,使用此压缩参数可以正常工作的读取功能。

def s3CsvToDf(file):
   with fs.open(file) as f:
      df = pd.read_csv(f, compression='gzip')
  return df
Run Code Online (Sandbox Code Playgroud)

写入问题的建议/替代方案?先感谢您!。

python amazon-s3 dask python-s3fs

7
推荐指数
1
解决办法
1722
查看次数

使用 dask read_parquet 方法过滤会产生不需要的结果

我正在尝试使用dask read_parquet方法和filterskwarg读取镶木地板文件。但是它有时不会根据给定的条件进行过滤。

示例:使用dates列创建和保存数据框

import pandas as pd
import numpy as np
import dask.dataframe as dd

nums  = range(1,6)
dates = pd.date_range('2018-07-01', periods=5, freq='1d')
df = pd.DataFrame({'dates':dates, 'nums': nums})

ddf = dd.from_pandas(df, npartitions=3).to_parquet('test_par', engine = 'fastparquet')
Run Code Online (Sandbox Code Playgroud)

当我dates'test_par'文件夹中读取和过滤列时,它似乎不起作用

filters=[('dates', '>', np.datetime64('2018-07-04'))]
df  = dd.read_parquet('test_par', engine='fastparquet', filters=filters).compute()
Run Code Online (Sandbox Code Playgroud)

正如您在输出中看到的那样,2018-07-03并且2018-07-04存在。

+-------+------------+------+
|       | dates      | nums |
+-------+------------+------+
| index |            |      |
+-------+------------+------+
| 2     | 2018-07-03 | 3    | …
Run Code Online (Sandbox Code Playgroud)

python filtering dataframe dask fastparquet

7
推荐指数
1
解决办法
2853
查看次数

dask:指定进程数

我正在尝试使用 dask 进行一些令人尴尬的并行处理。出于某种原因,我必须使用 dask,但使用multiprocessing.Pool(5).map.

例如:

import dask
from dask import compute, delayed

def do_something(x): return x * x

data = range(10)
delayed_values = [delayed(do_something)(x) for x in data]
results = compute(*delayed_values, scheduler='processes')
Run Code Online (Sandbox Code Playgroud)

它有效,但显然它只使用一个过程。

如何配置 dask 以便它使用 5 个进程池进行此计算?

python dask

7
推荐指数
2
解决办法
3350
查看次数

xarray/dask - 限制线程数/CPU

我对 xarray 相当陌生,我目前正在尝试利用它来对某些 NetCDF 进行子集化。我在共享服务器上运行它,想知道如何最好地限制 xarray 使用的处理能力,以便它与其他人很好地合作。我已经阅读了 dask 和 xarray 文档,但我似乎不清楚如何设置 cpus/线程的上限。以下是空间子集的示例:

import glob
import os
import xarray as xr

from multiprocessing.pool import ThreadPool
import dask

wd = os.getcwd()

test_data = os.path.join(wd, 'test_data')
lat_bnds = (43, 50)
lon_bnds = (-67, -80)
output = 'test_data_subset'

def subset_nc(ncfile, lat_bnds, lon_bnds, output):
    if not glob.os.path.exists(output):
        glob.os.makedirs(output)
    outfile = os.path.join(output, os.path.basename(ncfile).replace('.nc', '_subset.nc'))

    with dask.config.set(scheduler='threads', pool=ThreadPool(5)):
        ds = xr.open_dataset(ncfile, decode_times=False)

        ds_sub = ds.where(
            (ds.lon >= min(lon_bnds)) & (ds.lon <= max(lon_bnds)) & (ds.lat >= min(lat_bnds)) & …
Run Code Online (Sandbox Code Playgroud)

python multiprocessing dask python-xarray netcdf4

7
推荐指数
1
解决办法
235
查看次数

重命名 dask 数据框中的列

我有两个关于 dask 的问题。首先:dask 的文档明确指出您可以使用与 Pandas 相同的语法重命名列。我正在使用 dask 1.0.0。我在下面收到这些错误的任何原因?

df = pd.DataFrame(dictionary)
df
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

# I am not sure how to choose values for divisions, meta, and name. I am also pretty unsure about what these really do.
ddf = dd.DataFrame(dictionary, divisions=[8], meta=pd.DataFrame(dictionary), name='ddf')    
ddf
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

cols = {'Key':'key', '0':'Datetime','1':'col1','2':'col2','3':'col3','4':'col4','5':'col5'}

ddf.rename(columns=cols, inplace=True)

TypeError: rename() got an unexpected keyword argument 'inplace'
Run Code Online (Sandbox Code Playgroud)

好的,所以我删除了inplace=True并尝试了这个:

ddf = ddf.rename(columns=cols)

ValueError: dictionary update sequence element #0 has length 6; 2 is required
Run Code Online (Sandbox Code Playgroud)

pandas 数据框显示了一个真实的数据框,但是当我打电话时,ddf.compute()我得到了一个空的数据框。

在此处输入图片说明

My …

python pandas dask

7
推荐指数
1
解决办法
8452
查看次数

dask.distributed LocalCluster 与线程与进程之间的区别

以下LocalCluster配置有dask.distributed什么区别?

Client(n_workers=4, processes=False, threads_per_worker=1)
Run Code Online (Sandbox Code Playgroud)

相对

Client(n_workers=1, processes=True, threads_per_worker=4)
Run Code Online (Sandbox Code Playgroud)

它们都有四个线程处理任务图,但第一个线程有四个工作线程。那么,与具有多个线程的单个工作人员相比,让多个工作人员充当线程有什么好处?

编辑:只是澄清一下,我知道进程、线程和共享内存之间的区别,所以这个问题更多地针对这两个客户端的配置差异。

python dask dask-distributed

7
推荐指数
3
解决办法
2461
查看次数