不可重现,但有人可以填写为什么 .head() 调用在索引后会大大减慢吗?
import dask.dataframe as dd
df = dd.read_parquet("Filepath")
df.head() # takes 10 seconds
df = df.set_index('id')
df.head() # takes 10 minutes +
Run Code Online (Sandbox Code Playgroud) 我目前正在处理一个较小的数据集(大约 900 万行)。不幸的是,大多数条目都是字符串,即使强制类别,框架在内存中也只有几 GB。
我想做的是将每一行与其他行进行比较,并对内容进行直接比较。例如,给定
A B C D
0 cat blue old Saturday
1 dog red old Saturday
Run Code Online (Sandbox Code Playgroud)
我想计算
d_A d_B d_C d_D
0, 0 True True True True
0, 1 False False True True
1, 0 False False True True
1, 1 True True True True
Run Code Online (Sandbox Code Playgroud)
显然,组合爆炸将排除每个记录与其他记录的比较。因此我们可以通过应用 groupby 来使用阻塞,比如在 A 列上。
我的问题是,有没有一种方法可以在 pandas 或 dask 中执行此操作,比以下序列更快:
作为参考,假设我可以使用大量的核心(数百个)和大约 200G 的内存。
我有一个 numpy 数组,我想将其添加为现有 dask 数据框中的列。
enc = LabelEncoder()
nparr = enc.fit_transform(X[['url']])
Run Code Online (Sandbox Code Playgroud)
我有 dask 数据帧类型的 ddf。
ddf['nurl'] = nparr ???
Run Code Online (Sandbox Code Playgroud)
请问有什么优雅的方法可以实现上述目标吗?
Python PANDAS:从 pandas/numpy 转换为 dask 数据帧/数组这并不能解决我的问题,因为我希望将 numpy 数组转换为现有的 dask 数据帧。
我目前有一个用 pandas 编写的简单脚本,我想将其转换为 dask 数据帧。
在此脚本中,我正在对用户指定列上的两个数据帧执行合并,并尝试将其转换为 dask。
def merge_dfs(df1, df2, columns):
merged = pd.merge(df1, df2, on=columns, how='inner')
...
Run Code Online (Sandbox Code Playgroud)
如何更改此行以匹配 dask 数据帧?
我有一个 pandas 数据框,我使用from_pandasdask 函数将其转换为 dask 数据框。它有 3 列col1,即col2、 和col3。
现在我正在使用我正在搜索的daskdf[(daskdf.col1 == v1) & (daskdf.col2 == v2)]wherev1和v2are 值来搜索特定行。col3但是当我尝试获取using的值时,daskdf[(daskdf.col1 == v1) & (daskdf.col2 == v2)]['col3']它给了我一个 dask 系列结构而不是列值。
在熊猫中我可以做到pandasdf[(pandasdf.col1 == v1) & (pandasdf.col2 == v2)]['col3'].tolist()。我如何获取这里的值col3?
使用 Dask 文档中的确切代码: https://jobqueue.dask.org/en/latest/examples.html
如果页面发生变化,代码如下:
from dask_jobqueue import SLURMCluster
from distributed import Client
from dask import delayed
cluster = SLURMCluster(memory='8g',
processes=1,
cores=2,
extra=['--resources ssdGB=200,GPU=2'])
cluster.scale(2)
client = Client(cluster)
def step_1_w_single_GPU(data):
return "Step 1 done for: %s" % data
def step_2_w_local_IO(data):
return "Step 2 done for: %s" % data
stage_1 = [delayed(step_1_w_single_GPU)(i) for i in range(10)]
stage_2 = [delayed(step_2_w_local_IO)(s2) for s2 in stage_1]
result_stage_2 = client.compute(stage_2,
resources={tuple(stage_1): {'GPU': 1},
tuple(stage_2): {'ssdGB': 100}})
Run Code Online (Sandbox Code Playgroud)
这会导致这样的错误:
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback …Run Code Online (Sandbox Code Playgroud) 我正在使用Dask将 df 写入Parquet文件:
df.to_parquet(file, compression='snappy', write_metadata_file=False,\
engine='pyarrow', index=None)
Run Code Online (Sandbox Code Playgroud)
我需要在在线镶木地板查看器中显示文件的内容,
显示的列是:
Column1 Column2 Column3 __null_dask_index__
Run Code Online (Sandbox Code Playgroud)
如何删除该__null_dask_index__列?
我有一个这样的数据框:
df.head()
day time resource_record
0 27 00:00:00 AAAA
1 27 00:00:00 A
2 27 00:00:00 AAAA
3 27 00:00:01 A
4 27 00:00:02 A
Run Code Online (Sandbox Code Playgroud)
并想找出某些resource_records事物的存在次数。
我的第一个尝试是使用by返回的Series value_counts(),这看起来不错,但是由于以后没有drop()在中实现,因此我不允许以后再排除一些标签dask.Series。
因此,我尝试不打印不需要的标签:
for row in df.resource_record.value_counts().iteritems():
if row[0] in ['AAAA']:
continue
print('\t{0}\t{1}'.format(row[1], row[0]))
Run Code Online (Sandbox Code Playgroud)
哪个工作正常,但是如果我想进一步处理此数据并真的希望“清除”该怎么办。因此,我对文档进行了更多搜索并找到了mask(),但这也有些笨拙:
records = df.resource_record.mask(df.resource_record.map(lambda x: x in ['AAAA'])).value_counts()
Run Code Online (Sandbox Code Playgroud)
我正在寻找一种方法,该方法将允许我仅对单个值进行计数,但count()对所有非NaN值进行计数。
然后我找到了str.contains(),但是我不知道如何处理使用以下代码返回的未记录的Scalar类型:
print(df.resource_record.str.contains('A').sum())
Run Code Online (Sandbox Code Playgroud)
输出:
dd.Scalar<series-..., dtype=int64>
Run Code Online (Sandbox Code Playgroud)
但是即使在查看了Scalar的代码之后,dask/dataframe/core.py我也没有找到获得其价值的方法。
您如何有效地计算数据框中某组值的出现?
这是对dash洗改数据的跟进问题。
我有一个现有的dask数据框df,希望在其中执行以下操作:
df['rand_index'] = np.random.permutation(len(df))
Run Code Online (Sandbox Code Playgroud)
但是,这会产生错误Column assignment doesn't support type ndarray。我试图使用df.assign(rand_index = np.random.permutation(len(df))它给出相同的错误。
这是一个最小的(不是)工作示例:
import pandas as pd
import dask.dataframe as dd
import numpy as np
df = dd.from_pandas(pd.DataFrame({'A':[1,2,3]*10, 'B':[3,2,1]*10}), npartitions=10)
df['rand_index'] = np.random.permutation(len(df))
Run Code Online (Sandbox Code Playgroud)
前面的问题提到了使用,df = df.map_partitions(add_random_column_to_pandas_dataframe, ...)但是我不确定这是否与该特定情况有关。
我尝试过
df['rand_index'] = dd.from_array(np.random.permutation(len_df)),执行没有问题。当我检查时df.head(),似乎已经创建了新列。但是,当我看时df.tail(),rand_index是一堆NaNs。
实际上,只是为了确认我检查了df.rand_index.max().compute()哪个结果小于len(df)-1。所以这可能df.map_partitions是发挥作用的地方,因为我怀疑这是将dask分区的问题。在我的特定情况下,我有80个分区(不涉及示例情况)。
因此,我正在尝试读取大量包含水文数据的相对较大的netCDF文件。NetCDF文件全部如下所示:
<xarray.Dataset>
Dimensions: (feature_id: 2729077, reference_time: 1, time: 1)
Coordinates:
* time (time) datetime64[ns] 1993-01-11T21:00:00
* reference_time (reference_time) datetime64[ns] 1993-01-01
* feature_id (feature_id) int32 101 179 181 183 185 843 845 847 849 ...
Data variables:
streamflow (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
q_lateral (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
velocity (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qSfcLatRunoff (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qBucket (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
qBtmVertRunoff (feature_id) float64 dask.array<shape=(2729077,), chunksize=(50000,)>
Attributes:
featureType: timeSeries
proj4: +proj=longlat +datum=NAD83 +no_defs
model_initialization_time: 1993-01-01_00:00:00
station_dimension: feature_id
model_output_valid_time: 1993-01-11_21:00:00
stream_order_output: …Run Code Online (Sandbox Code Playgroud) dask ×10
python ×9
dataframe ×3
pandas ×3
dask-delayed ×1
netcdf ×1
numpy ×1
parquet ×1
python-3.x ×1