将列的元素除以按另一列的元素分组的元素(同一列)的总和

acc*_*rma 7 scala apache-spark apache-spark-sql

我一直在研究aSspark应用程序并尝试转换数据帧,如表1所示.我想将列(_2)的每个元素除以另一列(_1)的元素组成的元素之和(_2) ).表2是预期结果.

表格1

+---+---+
| _1| _2|
+---+---+
|  0| 13|
|  0|  7|
|  0|  3|
|  0|  1|
|  0|  1|
|  1|  4|
|  1|  8|
|  1| 18|
|  1|  4|
+---+---+
Run Code Online (Sandbox Code Playgroud)

表2

+---+----+
| _1| _2 |
+---+----+
|  0|13/x|
|  0| 7/x|
|  0| 3/x|
|  0| 1/x|
|  0| 1/x|
|  1| 4/y|
|  1| 8/y|
|  1|18/y|
|  1| 4/y|
+---+----+
Run Code Online (Sandbox Code Playgroud)

其中,x =(13 + 7 + 3 + 1 + 1)和y =(4 + 8 + 18 + 4)

然后,我想计算列_1中每个元素的熵:即,对于列_1中的每个元素,计算列_2中的和(p_i x log(p_i)).其中,p_i基本上是表2中 _1列中每个值的_2列中的值.

最终的输出是.

+---+---------+
| _1| ENTROPY |
+---+---------+
|  0|entropy_1|
|  1|entropy_2|
+---+---------+
Run Code Online (Sandbox Code Playgroud)

我怎样才能在spark(最好是scala)中实现它?执行上述操作的优化方法是什么?我是scala的新手,任何相关的建议都将受到高度赞赏.

谢谢.

zer*_*323 4

如果您想要一个简洁的解决方案并且组相当小,您可以使用窗口函数。首先你必须定义一个窗口:

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("_1").rowsBetween(Long.MinValue, Long.MaxValue)
Run Code Online (Sandbox Code Playgroud)

可能性:

import org.apache.spark.sql.functions.sum

val p = $"_2" / sum($"_2").over(w)
val withP = df.withColumn("p", p)
Run Code Online (Sandbox Code Playgroud)

最后是熵:

import org.apache.spark.sql.functions.log2

withP.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
Run Code Online (Sandbox Code Playgroud)

对于示例数据

val df = Seq(
  (0, 13), (0, 7), (0, 3), (0, 1), (0, 1), (1, 4), (1, 8), (1, 18), (1, 4)).toDF
Run Code Online (Sandbox Code Playgroud)

结果是:

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("_1").rowsBetween(Long.MinValue, Long.MaxValue)
Run Code Online (Sandbox Code Playgroud)

如果窗口函数的性能不可接受,您可以尝试聚合-连接-聚合:

df.groupBy($"_1").agg(sum("_2").alias("total"))
  .join(df, Seq("_1"), "inner")
  .withColumn("p", $"_2" / $"total")
  .groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
Run Code Online (Sandbox Code Playgroud)

在哪里:

df.groupBy($"_1").agg(sum("_2").alias("total"))
Run Code Online (Sandbox Code Playgroud)

计算_2by的总和_1

_.join(df, Seq("_1"), "inner")
Run Code Online (Sandbox Code Playgroud)

将聚合列添加到原始数据,

_.withColumn("p", $"_2" / $"total")
Run Code Online (Sandbox Code Playgroud)

计算概率,并且:

_.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
Run Code Online (Sandbox Code Playgroud)

聚合以获得熵。