acc*_*rma 7 scala apache-spark apache-spark-sql
我一直在研究aSspark应用程序并尝试转换数据帧,如表1所示.我想将列(_2)的每个元素除以另一列(_1)的元素组成的元素之和(_2) ).表2是预期结果.
表格1
+---+---+
| _1| _2|
+---+---+
| 0| 13|
| 0| 7|
| 0| 3|
| 0| 1|
| 0| 1|
| 1| 4|
| 1| 8|
| 1| 18|
| 1| 4|
+---+---+
Run Code Online (Sandbox Code Playgroud)
表2
+---+----+
| _1| _2 |
+---+----+
| 0|13/x|
| 0| 7/x|
| 0| 3/x|
| 0| 1/x|
| 0| 1/x|
| 1| 4/y|
| 1| 8/y|
| 1|18/y|
| 1| 4/y|
+---+----+
Run Code Online (Sandbox Code Playgroud)
其中,x =(13 + 7 + 3 + 1 + 1)和y =(4 + 8 + 18 + 4)
然后,我想计算列_1中每个元素的熵:即,对于列_1中的每个元素,计算列_2中的和(p_i x log(p_i)).其中,p_i基本上是表2中 _1列中每个值的_2列中的值.
最终的输出是.
+---+---------+
| _1| ENTROPY |
+---+---------+
| 0|entropy_1|
| 1|entropy_2|
+---+---------+
Run Code Online (Sandbox Code Playgroud)
我怎样才能在spark(最好是scala)中实现它?执行上述操作的优化方法是什么?我是scala的新手,任何相关的建议都将受到高度赞赏.
谢谢.
如果您想要一个简洁的解决方案并且组相当小,您可以使用窗口函数。首先你必须定义一个窗口:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("_1").rowsBetween(Long.MinValue, Long.MaxValue)
Run Code Online (Sandbox Code Playgroud)
可能性:
import org.apache.spark.sql.functions.sum
val p = $"_2" / sum($"_2").over(w)
val withP = df.withColumn("p", p)
Run Code Online (Sandbox Code Playgroud)
最后是熵:
import org.apache.spark.sql.functions.log2
withP.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
Run Code Online (Sandbox Code Playgroud)
对于示例数据
val df = Seq(
(0, 13), (0, 7), (0, 3), (0, 1), (0, 1), (1, 4), (1, 8), (1, 18), (1, 4)).toDF
Run Code Online (Sandbox Code Playgroud)
结果是:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("_1").rowsBetween(Long.MinValue, Long.MaxValue)
Run Code Online (Sandbox Code Playgroud)
如果窗口函数的性能不可接受,您可以尝试聚合-连接-聚合:
df.groupBy($"_1").agg(sum("_2").alias("total"))
.join(df, Seq("_1"), "inner")
.withColumn("p", $"_2" / $"total")
.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
Run Code Online (Sandbox Code Playgroud)
在哪里:
df.groupBy($"_1").agg(sum("_2").alias("total"))
Run Code Online (Sandbox Code Playgroud)
计算_2by的总和_1,
_.join(df, Seq("_1"), "inner")
Run Code Online (Sandbox Code Playgroud)
将聚合列添加到原始数据,
_.withColumn("p", $"_2" / $"total")
Run Code Online (Sandbox Code Playgroud)
计算概率,并且:
_.groupBy($"_1").agg((-sum($"p" * log2($"p"))).alias("entropy"))
Run Code Online (Sandbox Code Playgroud)
聚合以获得熵。
| 归档时间: |
|
| 查看次数: |
3501 次 |
| 最近记录: |