Sag*_*rys 2 numpy pandas apache-spark apache-spark-sql pyspark
我有一个很长的 pyspark 数据框,如下所示:
+------+
|number|
+------+
|12.4 |
|13.4 |
|42.3 |
|33.4 |
|42.3 |
|32.4 |
|44.2 |
|12.3 |
|45.4 |
+------+
Run Code Online (Sandbox Code Playgroud)
理想情况下,我希望将其重塑为nxn
矩阵,其中n
is sqrt(length of pyspark dataframe)
。
虽然有一个解决方案是将其转换为一个 numpy 数组,然后将其重塑为nxn
矩阵,但我希望在 pyspark 中完成。因为我的数据超长(大概一亿行)。
所以我正在寻找的预期输出是这样的:
+------+------+------+
|12.4 | 13.4 | 42.3 |
|33.4 | 42.3 | 32.4 |
|44.2 | 12.3 | 45.4 |
+------+------+------+
Run Code Online (Sandbox Code Playgroud)
虽然我能够通过将其转换为熊猫然后转换为 numpy 然后进行重塑操作来正确地做到这一点。但我想在 Pyspark 本身中进行这种转换。因为下面的代码仅适用于几千行。
covarianceMatrix_pd = covarianceMatrix_df.toPandas()
nrows = np.sqrt(len(covarianceMatrix_pd))
covarianceMatrix_pd = covarianceMatrix_pd.to_numpy().reshape((int(nrows),int(nrows)))
covarianceMatrix_pd
Run Code Online (Sandbox Code Playgroud)
一种方法是row_number
在我们对数据帧进行计数后使用with pivot :
from pyspark.sql import functions as F, Window
from math import sqrt
Run Code Online (Sandbox Code Playgroud)
c = int(sqrt(df.count())) #this gives 3
rnum = F.row_number().over(Window.orderBy(F.lit(1)))
out = (df.withColumn("Rnum",((rnum-1)/c).cast("Integer"))
.withColumn("idx",F.row_number().over(Window.partitionBy("Rnum").orderBy("Rnum")))
.groupby("Rnum").pivot("idx").agg(F.first("number")))
Run Code Online (Sandbox Code Playgroud)
out.show()
+----+----+----+----+
|Rnum| 1| 2| 3|
+----+----+----+----+
| 0|12.4|13.4|42.3|
| 1|33.4|42.3|32.4|
| 2|44.2|12.3|45.4|
+----+----+----+----+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
108 次 |
最近记录: |