想象一下我有一个Dask来自read_csv或以其他方式创建的 DataFrame。
如何为 dask 数据框创建唯一索引?
笔记:
reset_index在每个分区中构建单调升序索引。这意味着分区 1 为 (0,1,2,3,4,5,... ),分区 2 为 (0,1,2,3,4,5,... ),(0,1,2 ,3,4,5,... ) 对于分区 3 等等。
我想要数据帧中的每一行(跨所有分区)都有一个唯一的索引。
dask.dataframe尽管import dask有效,但在尝试导入接口时,我收到问题标题中所述的错误。
我当前的 dask 版本是2022.7.0. 可能是什么问题?
我有一个与此类似的 Pandas 数据框:
datetime data1 data2
2021-01-23 00:00:31.140 a1 a2
2021-01-23 00:00:31.140 b1 b2
2021-01-23 00:00:31.140 c1 c2
2021-01-23 00:01:29.021 d1 d2
2021-01-23 00:02:10.540 e1 e2
2021-01-23 00:02:10.540 f1 f2
Run Code Online (Sandbox Code Playgroud)
真实的数据帧非常大,对于每个唯一的时间戳,都有几千行。
我想将此数据帧保存到 Parquet 文件中,以便我可以快速读取具有特定日期时间索引的所有行,而无需加载整个文件或遍历它。如何在 Python 中正确保存它,以及如何快速仅读取一个特定日期时间的行?
阅读后,我想要一个新的数据框,其中包含该特定日期时间的所有行。例如,我只想从 Parquet 文件中读取 datetime "2021-01-23 00:00:31.140" 的行并接收此数据帧:
datetime data1 data2
2021-01-23 00:00:31.140 a1 a2
2021-01-23 00:00:31.140 b1 b2
2021-01-23 00:00:31.140 c1 c2
Run Code Online (Sandbox Code Playgroud)
我想知道它可能首先需要将每个时间戳的数据转换为一列,像这样,以便可以通过读取列而不是行来访问它?
2021-01-23 00:00:31.140 2021-01-23 00:01:29.021 2021-01-23 00:02:10.540
['a1', 'a2'] ['d1', 'd2'] ['e1', 'e2']
['b1', 'b2'] NaN ['f1', 'f2']
['c1', 'c2'] NaN NaN …Run Code Online (Sandbox Code Playgroud) 现在,我正在使用 Dask 读取大型 csv 文件,并对其进行一些后处理(例如,进行一些数学运算,然后通过某些 ML 模型进行预测并将结果写入数据库)。避免将所有数据加载到内存中,我想按当前大小的块读取:读取第一个块,预测,写入,读取第二个块等。
我使用skiprowsand尝试了下一个解决方案nrows:
import dask.dataframe as dd
read_path = "medium.csv"
# Read by chunk
skiprows = 100000
nrows = 50000
res_df = dd.read_csv(read_path, skiprows=skiprows)
res_df = res_df.head(nrows)
print(res_df.shape)
print(res_df.head())
Run Code Online (Sandbox Code Playgroud)
但我收到错误:
ValueError:样本不够大,无法包含至少一行数据。
sample请增加调用中的 字节数read_csv/read_table
另外,据我了解,它每次都会为所有数据计算二进制掩码([False,False,...,True,...])以查找要加载的行。我们怎样才能更有效地做到这一点?也许使用 dask 中的一些分布式或延迟函数?
我正在尝试使用一个小例子来学习 Dask。基本上我读入一个文件并计算行平均值。
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=4, memory='24 GB')
cluster.scale(4)
from dask.distributed import Client
client = Client(cluster)
import dask
import numpy as np
import dask.dataframe as dd
mytbl = dd.read_csv('test.txt', sep=' ')
row_mean = mytbl.loc[:, mytbl.columns != 'chrom'].apply(np.mean, axis=1, meta=(None, 'float64'))
row_mean = row_mean.compute()
Run Code Online (Sandbox Code Playgroud)
当我运行时compute,我可以在Dask仪表板中看到内存使用量增加非常快,并且所有CPU也都被使用。但随后内存增加停止,我看到这个错误:
distributed.utils - ERROR - "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
Traceback (most recent call last):
File "~/miniconda3/lib/python3.8/site-packages/distributed/utils.py", line 668, in log_errors
yield
File "~/miniconda3/lib/python3.8/site-packages/distributed/scheduler.py", line 3785, in add_worker
typename=types[key],
KeyError: "('read-csv-apply-67c8f79a72a7499f86a1707b269e0776', 0)"
distributed.core - ERROR …Run Code Online (Sandbox Code Playgroud) 在等宽离散化中,变量值被分配到相同宽度的区间。间隔的数量是用户定义的,宽度由最小/最大值和间隔的数量确定。
例如,给定值 10、20、100、130,最小值为 10,最大值为 130。如果用户将间隔数定义为 6,则给出以下公式:
区间宽度 = (Max(x) - Min(x)) / N
宽度为 (130 - 10) / 6 = 20
六个从零开始的区间是:[ 10, 30, 50, 70, 90, 110, 130]
最后,为数据集中的每个元素定义区间分配:
Value in the dataset New feature engineered value
10 0
20 0
57 2
101 4
130 5
Run Code Online (Sandbox Code Playgroud)
我有以下代码,它使用pandas数据框和sklean函数将数据框以等宽间隔划分:
from sklearn.preprocessing import KBinsDiscretizer
discretizer = KBinsDiscretizer(n_bins=10, encode='ordinal', strategy='uniform')
df['output_col'] = discretizer.fit_transform(df[['input_col']])
Run Code Online (Sandbox Code Playgroud)
这工作正常,但我需要实现一个等效的daskKBinsDiscretizer函数,该函数将在多个分区中并行触发该进程,并且我在dask_ml.preprocessing任何建议中找不到?我无法使用,map_partitions因为它将将该函数独立地应用于每个分区,并且我需要将间隔应用于整个数据帧。
将函数应用于 Dask 数组的每一列的最有效方法是什么?如下所述,我已经尝试了很多方法,但我仍然怀疑我对 Dask 的使用相当业余。
\n我有一个相当宽且相当长的数组,大小约为 3,000,000 x 10,000。我想将 ecdf 函数应用于该数组的每一列。堆叠在一起的各个列结果应生成与输入数组具有相同维度的数组。
\n考虑以下测试,让我知道哪种方法是理想的方法或者我可以如何改进。我知道,我可以只使用最快的,但我真的想最大限度地利用 Dask 的可能性。阵列也可以大数倍。与此同时,我的基准测试结果令我感到惊讶。也许我没有正确理解 Dask 背后的逻辑。
\nimport numpy as np\nimport dask\nimport dask.array as da\nfrom dask.distributed import Client, LocalCluster\nfrom statsmodels.distributions.empirical_distribution import ECDF\n\n### functions\ndef ecdf(x):\n fn = ECDF(x)\n return fn(x)\n\ndef ecdf_array(X):\n\n res = np.zeros_like(X)\n for i in range(X.shape[1]):\n res[:,i] = ecdf(X[:,i])\n \n return res\n\n### set up scheduler / workers\nn_workers = 10\ncluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)\nclient = Client(cluster)\n\n### create data set \nX = da.random.random((100000,100)) #dask\nXarr = X.compute() #numpy\n\n### traditional for loop\n%timeit …Run Code Online (Sandbox Code Playgroud) 我目前有一个用 pandas 编写的简单脚本,我想将其转换为 dask 数据帧。
在此脚本中,我正在对用户指定列上的两个数据帧执行合并,并尝试将其转换为 dask。
def merge_dfs(df1, df2, columns):
merged = pd.merge(df1, df2, on=columns, how='inner')
...
Run Code Online (Sandbox Code Playgroud)
如何更改此行以匹配 dask 数据帧?
我有一个 pandas 数据框,我使用from_pandasdask 函数将其转换为 dask 数据框。它有 3 列col1,即col2、 和col3。
现在我正在使用我正在搜索的daskdf[(daskdf.col1 == v1) & (daskdf.col2 == v2)]wherev1和v2are 值来搜索特定行。col3但是当我尝试获取using的值时,daskdf[(daskdf.col1 == v1) & (daskdf.col2 == v2)]['col3']它给了我一个 dask 系列结构而不是列值。
在熊猫中我可以做到pandasdf[(pandasdf.col1 == v1) & (pandasdf.col2 == v2)]['col3'].tolist()。我如何获取这里的值col3?
我正在使用Dask将 df 写入Parquet文件:
df.to_parquet(file, compression='snappy', write_metadata_file=False,\
engine='pyarrow', index=None)
Run Code Online (Sandbox Code Playgroud)
我需要在在线镶木地板查看器中显示文件的内容,
显示的列是:
Column1 Column2 Column3 __null_dask_index__
Run Code Online (Sandbox Code Playgroud)
如何删除该__null_dask_index__列?
我知道以前也有人问过类似的问题,但他们的解决方案并不是很有帮助。我想最好的解决方案可能更具体于每个集群配置,因此我在此处提供有关我的集群和错误的更多详细信息。
import dask.dataframe as dd
import dask.bag as db
import json
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
Run Code Online (Sandbox Code Playgroud)
这是我的集群设置
cluster.scheduler
Run Code Online (Sandbox Code Playgroud)
#输出:
Scheduler: tcp://127.0.0.1:35367 workers: 8 cores: 48 tasks: 0
cluster.workers
Run Code Online (Sandbox Code Playgroud)
#输出:
{0: <Nanny: tcp://127.0.0.1:43789, threads: 6>,
1: <Nanny: tcp://127.0.0.1:41375, threads: 6>,
2: <Nanny: tcp://127.0.0.1:42577, threads: 6>,
3: <Nanny: tcp://127.0.0.1:40171, threads: 6>,
4: <Nanny: tcp://127.0.0.1:32867, threads: 6>,
5: <Nanny: tcp://127.0.0.1:46529, threads: 6>,
6: <Nanny: tcp://127.0.0.1:41535, threads: 6>,
7: <Nanny: tcp://127.0.0.1:39645, threads: 6>}
client
Run Code Online (Sandbox Code Playgroud)
#输出
Client
Scheduler: tcp://127.0.0.1:35367 …Run Code Online (Sandbox Code Playgroud) memory-management cluster-computing worker bigdata dask-dataframe
假设我们有 pandas dataframepd和 dask dataframe dd。当我想用 matplotlib 绘制 pandas 时,我可以轻松做到:
fig, ax = plt.subplots()
ax.bar(pd["series1"], pd["series2"])
fig.savefig(path)
Run Code Online (Sandbox Code Playgroud)
然而,当我尝试对 dask dataframe 执行相同操作时,我得到的Type Errors是:
TypeError: Cannot interpret 'string[python]' as a data type
Run Code Online (Sandbox Code Playgroud)
string[python]这只是一个示例,无论您的dd["series1"]数据类型是什么,都将在此处输入。
所以我的问题是:使用matplotlibwith 的正确方法是什么dask?将这两个库结合起来是否是一个好主意?
对于当前的项目,我计划将两个非常大的 CSV 文件与 Dask 合并,作为 Pandas 的替代方案。我已经彻底安装了Dask pip install "dask[dataframe]"。
然而,在跑步时import dask.dataframe as dd,我收到了反馈ModuleNotFoundError: No module named 'dask.dataframe'; 'dask' is not a package。
几个用户似乎遇到了同样的问题,并建议通过 Conda 安装该模块,这对我的情况也没有帮助。
找不到模块是什么原因?
dask-dataframe ×13
dask ×12
python ×10
pandas ×7
dataframe ×4
parquet ×2
bigdata ×1
csv ×1
dask-delayed ×1
matplotlib ×1
scikit-learn ×1
worker ×1