Dask的自定义图形API似乎只支持返回一个输出键/值的函数。
例如,以下依赖项无法轻松表示为 Dask 图:
B -> D
/ \
A- -> F
\ /
C -> E
Run Code Online (Sandbox Code Playgroud)
这可以通过将元组存储在“复合”键(例如在这种情况下为“B_C”)下,然后将其拆分getitem()或类似来解决。但是,这会导致执行效率低下(例如不必要的序列化)并降低 DAG 可视化的清晰度。
有没有更好的方法,或者目前不支持?
我正在尝试将 dask 数据框聚合到一组指标,包括中位数,但看起来不支持该中位数。有机会汇总并获得中位数吗?
st_agg = df.groupby(['start station id', 'end station id']).agg({'usertype':'count', 'tripduration':'median'})
>>> ValueError: unknown aggregate median
Run Code Online (Sandbox Code Playgroud) 正如有据可查的那样,Dask 在reset_index调用时会在每个分区的基础上创建一个严格递增的索引,从而导致整个集合上出现重复的索引。在 Dask 中创建严格递增索引(不必是连续的)在整个集合上的最佳方法(例如计算最快)是什么?我希望map_partitions能传递分区号,但我认为不会。谢谢。
编辑
谢谢@MRocklin,我已经做到了这一点,但我需要一些关于如何将我的系列与原始数据框重新组合的帮助。
def create_increasing_index(ddf:dd.DataFrame):
mps = int(len(ddf) / ddf.npartitions + 1000)
values = ddf.index.values
def do(x, max_partition_size, block_id=None):
length = len(x)
if length == 0:
raise ValueError("Does not work with empty partitions. Consider using dask.repartition.")
start = block_id[0] * max_partition_size
return da.arange(start, start+length, chunks=1)
series = values.map_blocks(do, max_partition_size=mps, dtype=np.int64)
ddf2 = dd.concat([ddf, dd.from_array(series)], axis=1)
return ddf2
Run Code Online (Sandbox Code Playgroud)
我收到错误“ValueError:无法将 DataFrame 与指定 axis=1 的未知除法连接”。有没有比使用 dd.concat 更好的方法?谢谢。
再次编辑
实际上,就我的目的而言(以及我测试的数据量 - 只有几 GB),cumsum 已经足够快了。当这变得太慢时我会重新访问!
我正在尝试在 Dask 上使用 Pivot_table 和以下数据框:
date store_nbr item_nbr unit_sales year month
0 2013-01-01 25 103665 7.0 2013 1
1 2013-01-01 25 105574 1.0 2013 1
2 2013-01-01 25 105575 2.0 2013 1
3 2013-01-01 25 108079 1.0 2013 1
4 2013-01-01 25 108701 1.0 2013 1
Run Code Online (Sandbox Code Playgroud)
当我尝试将 pivot_table 如下:
ddf.pivot_table(values='unit_sales', index={'store_nbr','item_nbr'},
columns={'year','month'}, aggfunc={'mean','sum'})
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
ValueError: 'index' must be the name of an existing column
Run Code Online (Sandbox Code Playgroud)
如果我只在索引和列参数上使用一个值,如下所示:
df.pivot_table(values='unit_sales', index='store_nbr',
columns='year', aggfunc={'sum'})
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
ValueError: 'columns' must be category dtype
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用s3fs库和 pandas在 S3 上将数据帧编写为 CSV 文件。尽管有文档,但恐怕 gzip 压缩参数不适用于 s3fs。
def DfTos3Csv (df,file):
with fs.open(file,'wb') as f:
df.to_csv(f, compression='gzip', index=False)
Run Code Online (Sandbox Code Playgroud)
此代码将数据帧保存为 S3 中的新对象,但保存为纯 CSV 而非 gzip 格式。另一方面,使用此压缩参数可以正常工作的读取功能。
def s3CsvToDf(file):
with fs.open(file) as f:
df = pd.read_csv(f, compression='gzip')
return df
Run Code Online (Sandbox Code Playgroud)
写入问题的建议/替代方案?先感谢您!。
我正在尝试使用dask read_parquet方法和filterskwarg读取镶木地板文件。但是它有时不会根据给定的条件进行过滤。
示例:使用dates列创建和保存数据框
import pandas as pd
import numpy as np
import dask.dataframe as dd
nums = range(1,6)
dates = pd.date_range('2018-07-01', periods=5, freq='1d')
df = pd.DataFrame({'dates':dates, 'nums': nums})
ddf = dd.from_pandas(df, npartitions=3).to_parquet('test_par', engine = 'fastparquet')
Run Code Online (Sandbox Code Playgroud)
当我dates从'test_par'文件夹中读取和过滤列时,它似乎不起作用
filters=[('dates', '>', np.datetime64('2018-07-04'))]
df = dd.read_parquet('test_par', engine='fastparquet', filters=filters).compute()
Run Code Online (Sandbox Code Playgroud)
正如您在输出中看到的那样,2018-07-03并且2018-07-04存在。
+-------+------------+------+
| | dates | nums |
+-------+------------+------+
| index | | |
+-------+------------+------+
| 2 | 2018-07-03 | 3 | …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 dask 进行一些令人尴尬的并行处理。出于某种原因,我必须使用 dask,但使用multiprocessing.Pool(5).map.
例如:
import dask
from dask import compute, delayed
def do_something(x): return x * x
data = range(10)
delayed_values = [delayed(do_something)(x) for x in data]
results = compute(*delayed_values, scheduler='processes')
Run Code Online (Sandbox Code Playgroud)
它有效,但显然它只使用一个过程。
如何配置 dask 以便它使用 5 个进程池进行此计算?
我对 xarray 相当陌生,我目前正在尝试利用它来对某些 NetCDF 进行子集化。我在共享服务器上运行它,想知道如何最好地限制 xarray 使用的处理能力,以便它与其他人很好地合作。我已经阅读了 dask 和 xarray 文档,但我似乎不清楚如何设置 cpus/线程的上限。以下是空间子集的示例:
import glob
import os
import xarray as xr
from multiprocessing.pool import ThreadPool
import dask
wd = os.getcwd()
test_data = os.path.join(wd, 'test_data')
lat_bnds = (43, 50)
lon_bnds = (-67, -80)
output = 'test_data_subset'
def subset_nc(ncfile, lat_bnds, lon_bnds, output):
if not glob.os.path.exists(output):
glob.os.makedirs(output)
outfile = os.path.join(output, os.path.basename(ncfile).replace('.nc', '_subset.nc'))
with dask.config.set(scheduler='threads', pool=ThreadPool(5)):
ds = xr.open_dataset(ncfile, decode_times=False)
ds_sub = ds.where(
(ds.lon >= min(lon_bnds)) & (ds.lon <= max(lon_bnds)) & (ds.lat >= min(lat_bnds)) & …Run Code Online (Sandbox Code Playgroud) 我有两个关于 dask 的问题。首先:dask 的文档明确指出您可以使用与 Pandas 相同的语法重命名列。我正在使用 dask 1.0.0。我在下面收到这些错误的任何原因?
df = pd.DataFrame(dictionary)
df
Run Code Online (Sandbox Code Playgroud)
# I am not sure how to choose values for divisions, meta, and name. I am also pretty unsure about what these really do.
ddf = dd.DataFrame(dictionary, divisions=[8], meta=pd.DataFrame(dictionary), name='ddf')
ddf
Run Code Online (Sandbox Code Playgroud)
cols = {'Key':'key', '0':'Datetime','1':'col1','2':'col2','3':'col3','4':'col4','5':'col5'}
ddf.rename(columns=cols, inplace=True)
TypeError: rename() got an unexpected keyword argument 'inplace'
Run Code Online (Sandbox Code Playgroud)
好的,所以我删除了inplace=True并尝试了这个:
ddf = ddf.rename(columns=cols)
ValueError: dictionary update sequence element #0 has length 6; 2 is required
Run Code Online (Sandbox Code Playgroud)
pandas 数据框显示了一个真实的数据框,但是当我打电话时,ddf.compute()我得到了一个空的数据框。
My …
以下LocalCluster配置有dask.distributed什么区别?
Client(n_workers=4, processes=False, threads_per_worker=1)
Run Code Online (Sandbox Code Playgroud)
相对
Client(n_workers=1, processes=True, threads_per_worker=4)
Run Code Online (Sandbox Code Playgroud)
它们都有四个线程处理任务图,但第一个线程有四个工作线程。那么,与具有多个线程的单个工作人员相比,让多个工作人员充当线程有什么好处?
编辑:只是澄清一下,我知道进程、线程和共享内存之间的区别,所以这个问题更多地针对这两个客户端的配置差异。
dask ×10
python ×9
dataframe ×2
amazon-s3 ×1
fastparquet ×1
filtering ×1
netcdf4 ×1
pandas ×1
pivot-table ×1
python-3.x ×1
python-s3fs ×1