标签: dask-dataframe

如何在Dask DataFrame中创建唯一索引?

想象一下我有一个Dask来自read_csv或以其他方式创建的 DataFrame。

如何为 dask 数据框创建唯一索引?

笔记:

reset_index在每个分区中构建单调升序索引。这意味着分区 1 为 (0,1,2,3,4,5,... ),分区 2 为 (0,1,2,3,4,5,... ),(0,1,2 ,3,4,5,... ) 对于分区 3 等等。

我想要数据帧中的每一行(跨所有分区)都有一个唯一的索引。

python dataframe pandas dask dask-dataframe

6
推荐指数
1
解决办法
2185
查看次数

AttributeError:导入 Dask 时模块“pandas.core.strings”没有属性“StringMethods”

dask.dataframe尽管import dask有效,但在尝试导入接口时,我收到问题标题中所述的错误。

我当前的 dask 版本是2022.7.0. 可能是什么问题?

pandas dask dask-distributed dask-dataframe

6
推荐指数
1
解决办法
1万
查看次数

从 Python Pandas / Dask 中的 Parquet 文件读取一组行?

我有一个与此类似的 Pandas 数据框:

datetime                 data1  data2
2021-01-23 00:00:31.140     a1     a2
2021-01-23 00:00:31.140     b1     b2       
2021-01-23 00:00:31.140     c1     c2
2021-01-23 00:01:29.021     d1     d2
2021-01-23 00:02:10.540     e1     e2
2021-01-23 00:02:10.540     f1     f2
Run Code Online (Sandbox Code Playgroud)

真实的数据帧非常大,对于每个唯一的时间戳,都有几千行。

我想将此数据帧保存到 Parquet 文件中,以便我可以快速读取具有特定日期时间索引的所有行,而无需加载整个文件或遍历它。如何在 Python 中正确保存它,以及如何快速仅读取一个特定日期时间的行?

阅读后,我想要一个新的数据框,其中包含该特定日期时间的所有行。例如,我只想从 Parquet 文件中读取 datetime "2021-01-23 00:00:31.140" 的行并接收此数据帧:

datetime                 data1  data2
2021-01-23 00:00:31.140     a1     a2
2021-01-23 00:00:31.140     b1     b2        
2021-01-23 00:00:31.140     c1     c2
Run Code Online (Sandbox Code Playgroud)

我想知道它可能首先需要将每个时间戳的数据转换为一列,像这样,以便可以通过读取列而不是行来访问它?

2021-01-23 00:00:31.140  2021-01-23 00:01:29.021  2021-01-23 00:02:10.540
           ['a1', 'a2']             ['d1', 'd2']             ['e1', 'e2']
           ['b1', 'b2']                      NaN             ['f1', 'f2']
           ['c1', 'c2']                      NaN                      NaN …
Run Code Online (Sandbox Code Playgroud)

python pandas parquet dask dask-dataframe

5
推荐指数
1
解决办法
454
查看次数

使用 Dask 高效地按部分读取大 csv 文件

现在,我正在使用 Dask 读取大型 csv 文件,并对其进行一些后处理(例如,进行一些数学运算,然后通过某些 ML 模型进行预测并将结果写入数据库)。避免将所有数据加载到内存中,我想按当前大小的块读取:读取第一个块,预测,写入,读取第二个块等。

我使用skiprowsand尝试了下一个解决方案nrows

import dask.dataframe as dd
read_path = "medium.csv"

# Read by chunk
skiprows = 100000
nrows = 50000
res_df = dd.read_csv(read_path, skiprows=skiprows)
res_df = res_df.head(nrows)

print(res_df.shape)
print(res_df.head())
Run Code Online (Sandbox Code Playgroud)

但我收到错误:

ValueError:样本不够大,无法包含至少一行数据。sample请增加调用中的 字节数read_csv/read_table

另外,据我了解,它每次都会为所有数据计算二进制掩码([False,False,...,True,...])以查找要加载的行。我们怎样才能更有效地做到这一点?也许使用 dask 中的一些分布式或延迟函数?

python csv dask dask-dataframe

5
推荐指数
1
解决办法
6351
查看次数

Dask 分布式 KeyError

我正在尝试使用一个小例子来学习 Dask。基本上我读入一个文件并计算行平均值。

from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=4, memory='24 GB')

cluster.scale(4)

from dask.distributed import Client
client = Client(cluster)

import dask
import numpy as np
import dask.dataframe as dd

mytbl = dd.read_csv('test.txt', sep=' ')
row_mean = mytbl.loc[:, mytbl.columns != 'chrom'].apply(np.mean, axis=1, meta=(None, 'float64'))
row_mean = row_mean.compute()
Run Code Online (Sandbox Code Playgroud)

当我运行时compute,我可以在Dask仪表板中看到内存使用量增加非常快,并且所有CPU也都被使用。但随后内存增加停止,我看到这个错误:

distributed.utils - ERROR - "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
Traceback (most recent call last):
  File "~/miniconda3/lib/python3.8/site-packages/distributed/utils.py", line 668, in log_errors
    yield
  File "~/miniconda3/lib/python3.8/site-packages/distributed/scheduler.py", line 3785, in add_worker
    typename=types[key],
KeyError: "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
distributed.core - ERROR …
Run Code Online (Sandbox Code Playgroud)

python dask dask-distributed dask-dataframe dask-jobqueue

5
推荐指数
0
解决办法
1175
查看次数

在 Dask 中实施等宽间隔特征工程

在等宽离散化中,变量值被分配到相同宽度的区间。间隔的数量是用户定义的,宽度由最小/最大值和间隔的数量确定。

例如,给定值 10、20、100、130,最小值为 10,最大值为 130。如果用户将间隔数定义为 6,则给出以下公式:

区间宽度 = (Max(x) - Min(x)) / N

宽度为 (130 - 10) / 6 = 20

六个从零开始的区间是:[ 10, 30, 50, 70, 90, 110, 130]

最后,为数据集中的每个元素定义区间分配:

Value in the dataset    New feature engineered value
          10                      0
          20                      0
          57                      2 
         101                      4
         130                      5
Run Code Online (Sandbox Code Playgroud)

我有以下代码,它使用pandas数据框和sklean函数将数据框以等宽间隔划分:

from sklearn.preprocessing import KBinsDiscretizer
discretizer = KBinsDiscretizer(n_bins=10, encode='ordinal', strategy='uniform')
df['output_col'] = discretizer.fit_transform(df[['input_col']])
Run Code Online (Sandbox Code Playgroud)

这工作正常,但我需要实现一个等效的daskKBinsDiscretizer函数,该函数将在多个分区中并行触发该进程,并且我在dask_ml.preprocessing任何建议中找不到?我无法使用,map_partitions因为它将将该函数独立地应用于每个分区,并且我需要将间隔应用于整个数据帧。

scikit-learn dask dask-distributed dask-dataframe

5
推荐指数
1
解决办法
453
查看次数

对 Dask 数组的列应用函数

将函数应用于 Dask 数组的每一列的最有效方法是什么?如下所述,我已经尝试了很多方法,但我仍然怀疑我对 Dask 的使用相当业余。

\n

我有一个相当宽且相当长的数组,大小约为 3,000,000 x 10,000。我想将 ecdf 函数应用于该数组的每一列。堆叠在一起的各个列结果应生成与输入数组具有相同维度的数组。

\n

考虑以下测试,让我知道哪种方法是理想的方法或者我可以如何改进。我知道,我可以只使用最快的,但我真的想最大限度地利用 Dask 的可能性。阵列也可以大数倍。与此同时,我的基准测试结果令我感到惊讶。也许我没有正确理解 Dask 背后的逻辑。

\n
import numpy as np\nimport dask\nimport dask.array as da\nfrom dask.distributed import Client, LocalCluster\nfrom statsmodels.distributions.empirical_distribution import ECDF\n\n### functions\ndef ecdf(x):\n    fn = ECDF(x)\n    return fn(x)\n\ndef ecdf_array(X):\n\n    res = np.zeros_like(X)\n    for i in range(X.shape[1]):\n        res[:,i] = ecdf(X[:,i])\n        \n    return res\n\n### set up scheduler / workers\nn_workers = 10\ncluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)\nclient = Client(cluster)\n\n### create data set \nX = da.random.random((100000,100)) #dask\nXarr = X.compute() #numpy\n\n### traditional for loop\n%timeit …
Run Code Online (Sandbox Code Playgroud)

python dask dask-delayed dask-distributed dask-dataframe

5
推荐指数
1
解决办法
682
查看次数

使用 dask 合并列

我目前有一个用 pandas 编写的简单脚本,我想将其转换为 dask 数据帧。
在此脚本中,我正在对用户指定列上的两个数据帧执行合并,并尝试将其转换为 dask。

def merge_dfs(df1, df2, columns):
    merged = pd.merge(df1, df2, on=columns, how='inner')
...
Run Code Online (Sandbox Code Playgroud)

如何更改此行以匹配 dask 数据帧?

python dataframe pandas dask dask-dataframe

4
推荐指数
1
解决办法
4538
查看次数

在 dask 中搜索行后获取列值

我有一个 pandas 数据框,我使用from_pandasdask 函数将其转换为 dask 数据框。它有 3 列col1,即col2、 和col3

现在我正在使用我正在搜索的daskdf[(daskdf.col1 == v1) & (daskdf.col2 == v2)]wherev1v2are 值来搜索特定行。col3但是当我尝试获取using的值时,daskdf[(daskdf.col1 == v1) & (daskdf.col2 == v2)]['col3']它给了我一个 dask 系列结构而不是列值。

在熊猫中我可以做到pandasdf[(pandasdf.col1 == v1) & (pandasdf.col2 == v2)]['col3'].tolist()。我如何获取这里的值col3

python dataframe pandas dask dask-dataframe

4
推荐指数
1
解决办法
1754
查看次数

如何从 parquet 文件中删除 __null_dask_index ?

我正在使用Dask将 df 写入Parquet文件:

df.to_parquet(file, compression='snappy', write_metadata_file=False,\
              engine='pyarrow', index=None)
Run Code Online (Sandbox Code Playgroud)

我需要在在线镶木地板查看器中显示文件的内容,

显示的列是:

Column1  Column2  Column3  __null_dask_index__
Run Code Online (Sandbox Code Playgroud)

如何删除该__null_dask_index__列?

python dataframe parquet dask dask-dataframe

4
推荐指数
1
解决办法
1182
查看次数

本地集群上加载 Dask 数据:“工作线程超出了 95% 的内存预算”。重新启动然后“KilledWorker”

我知道以前也有人问过类似的问题,但他们的解决方案并不是很有帮助。我想最好的解决方案可能更具体于每个集群配置,因此我在此处提供有关我的集群和错误的更多详细信息。

import dask.dataframe as dd
import dask.bag as db
import json

from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
Run Code Online (Sandbox Code Playgroud)

这是我的集群设置

cluster.scheduler
Run Code Online (Sandbox Code Playgroud)

#输出:

Scheduler: tcp://127.0.0.1:35367 workers: 8 cores: 48 tasks: 0



cluster.workers
Run Code Online (Sandbox Code Playgroud)

#输出:

{0: <Nanny: tcp://127.0.0.1:43789, threads: 6>,
 1: <Nanny: tcp://127.0.0.1:41375, threads: 6>,
 2: <Nanny: tcp://127.0.0.1:42577, threads: 6>,
 3: <Nanny: tcp://127.0.0.1:40171, threads: 6>,
 4: <Nanny: tcp://127.0.0.1:32867, threads: 6>,
 5: <Nanny: tcp://127.0.0.1:46529, threads: 6>,
 6: <Nanny: tcp://127.0.0.1:41535, threads: 6>,
 7: <Nanny: tcp://127.0.0.1:39645, threads: 6>}


client
Run Code Online (Sandbox Code Playgroud)

#输出

Client
Scheduler: tcp://127.0.0.1:35367 …
Run Code Online (Sandbox Code Playgroud)

memory-management cluster-computing worker bigdata dask-dataframe

3
推荐指数
1
解决办法
1180
查看次数

将 Matplotlib 与 Dask 结合使用

假设我们有 pandas dataframepd和 dask dataframe dd。当我想用 matplotlib 绘制 pandas 时,我可以轻松做到:

fig, ax = plt.subplots()
ax.bar(pd["series1"], pd["series2"])
fig.savefig(path)
Run Code Online (Sandbox Code Playgroud)

然而,当我尝试对 dask dataframe 执行相同操作时,我得到的Type Errors是:

TypeError: Cannot interpret 'string[python]' as a data type
Run Code Online (Sandbox Code Playgroud)

string[python]这只是一个示例,无论您的dd["series1"]数据类型是什么,都将在此处输入。

所以我的问题是:使用matplotlibwith 的正确方法是什么dask?将这两个库结合起来是否是一个好主意?

python matplotlib pandas dask dask-dataframe

3
推荐指数
2
解决办法
2102
查看次数

ModuleNotFoundError:没有名为“dask.dataframe”的模块;“dask”不是一个包

对于当前的项目,我计划将两个非常大的 CSV 文件与 Dask 合并,作为 Pandas 的替代方案。我已经彻底安装了Dask pip install "dask[dataframe]"

然而,在跑步时import dask.dataframe as dd,我收到了反馈ModuleNotFoundError: No module named 'dask.dataframe'; 'dask' is not a package

几个用户似乎遇到了同样的问题,并建议通过 Conda 安装该模块,这对我的情况也没有帮助。

找不到模块是什么原因?

python pandas dask dask-dataframe

2
推荐指数
1
解决办法
4053
查看次数