Geo*_*ler 3 sql apache-spark apache-spark-sql
我有一张桌子
+---------------+------+
|id | value|
+---------------+------+
| 1|118.0|
| 2|109.0|
| 3|113.0|
| 4| 82.0|
| 5| 60.0|
| 6|111.0|
| 7|107.0|
| 8| 84.0|
| 9| 91.0|
| 10|118.0|
+---------------+------+
Run Code Online (Sandbox Code Playgroud)
ans想要将值聚合或bin到一个范围0,10,20,30,40,...80,90,100,110,120我如何在SQL或更具体的spark sql中执行此操作?
目前我有一个横向视图连接范围,但这似乎相当笨拙/低效.
离散化的分位数并不是我想要的,而是CUT具有这个范围.
https://github.com/collectivemedia/spark-ext/blob/master/sparkext-mllib/src/main/scala/org/apache/spark/ml/feature/Binning.scala会执行动态垃圾箱,但我宁愿需要这个指定的范围.
Hri*_*iev 10
在一般情况下,可以使用org.apache.spark.ml.feature.Bucketizer执行静态分箱:
val df = Seq(
(1, 118.0), (2, 109.0), (3, 113.0), (4, 82.0), (5, 60.0),
(6, 111.0), (7, 107.0), (8, 84.0), (9, 91.0), (10, 118.0)
).toDF("id", "value")
val splits = (0 to 12).map(_ * 10.0).toArray
import org.apache.spark.ml.feature.Bucketizer
val bucketizer = new Bucketizer()
.setInputCol("value")
.setOutputCol("bucket")
.setSplits(splits)
val bucketed = bucketizer.transform(df)
val solution = bucketed.groupBy($"bucket").agg(count($"id") as "count")
Run Code Online (Sandbox Code Playgroud)
结果:
scala> solution.show
+------+-----+
|bucket|count|
+------+-----+
| 8.0| 2|
| 11.0| 4|
| 10.0| 2|
| 6.0| 1|
| 9.0| 1|
+------+-----+
Run Code Online (Sandbox Code Playgroud)
当值位于定义的区间之外时,bucketizer会抛出错误.可以将分割点定义为捕获异常值Double.NegativeInfinity或Double.PositiveInfinity捕获异常值.
Bucketizer通过对正确的存储桶执行二进制搜索,可以有效地处理任意拆分.对于像你这样的常规垃圾箱,可以简单地做一些事情:
val binned = df.withColumn("bucket", (($"value" - bin_min) / bin_width) cast "int")
Run Code Online (Sandbox Code Playgroud)
其中bin_min和bin_width是最小bin的左边距和bin宽度.
| 归档时间: |
|
| 查看次数: |
3358 次 |
| 最近记录: |