MrG*_*MrG 2 scala apache-spark apache-spark-sql
我有一个如下的数据框
articles
10
99
101
101
10005
1000001
1000001
我想要输出数据帧如下
range              sum
1-100              109
101-10000          202
10001-1000000      10005
1000001-100000000  2000002
...                ...
如何实现这一目标.我是新来的火花和斯卡拉.
我建议您首先使用when/ 找到值的范围,otherwise然后您可以对其进行分组range并对以下内容执行sum聚合articles:
import org.apache.spark.sql.functions._
df.withColumn("range", 
          when($"articles" >  0 and $"articles" <= 100, lit("1-100"))
            .otherwise(
              when($"articles" > 100 and $"articles" <= 10000, lit("101-10000")).otherwise(lit("others"))
            )
         ).groupBy("range").agg(sum($"articles")).orderBy("range").show
// +---------+-------------+
// |    range|sum(articles)|
// +---------+-------------+
// |    1-100|          109|
// |101-10000|          202|
// |   others|      2010007|
// +---------+-------------+
我会使用 UDF 对文章进行分类(分桶),然后使用普通groupBy().agg()来计算总和。
case class Bucket(start: Long, end: Long) {
  def contains(l: Long) = start <= l && end >= l
  override def toString: String = s"$start - $end"
}
val buckets = Seq(
  Bucket(1L, 100L),
  Bucket(101L, 10000L),
  Bucket(10001L, 100000L),
  Bucket(1000001L, 10000000L)
)
val bucketize = udf((l: Long) => buckets.find(_.contains(l)).map(_.toString))
df
  .withColumn("bucket", bucketize($"article"))
  .groupBy($"bucket")
  .agg(
    sum($"article").as("sum")
  )
您可以使用 a 上的groupByKey方法Dataset轻松定义您的键控,而不是像通常使用groupBy. 以下示例可以在您的 上运行spark-shell,否则请记住创建您的SparkSession和import org.apache.spark.sql.functions.sum:
// relevant types: one for actual data, the other to define ranges
final case class Data(articles: Int)
final case class Range(from: Int, to: Int)
// the data we want to process
val dataset = spark.createDataset(
  Seq(Data(10), Data(99), Data(101), Data(101), Data(10005), Data(1000001), Data(1000001)))
// the ranges we wanto _bucket_ our data in
val ranges = spark.sparkContext.broadcast(
  Seq(Range(1, 100), Range(101, 10000), Range(10001, 1000000), Range(1000001, 100000000)))
// the actual operation: group by range and sum the values in each bucket
dataset.groupByKey {
  d =>
    ranges.value.find(r => d.articles >= r.from && d.articles <= r.to).orNull
}.agg(sum("articles").as[Long])
这将是这段代码的输出:
+-------------------+-------------+
|                key|sum(articles)|
+-------------------+-------------+
|            [1,100]|          109|
|        [101,10000]|          202|
|    [10001,1000000]|        10005|
|[1000001,100000000]|      2000002|
+-------------------+-------------+
我们所做的:
sumbyarticles并将结果强制转换为Long(typed Datasets需要)不属于特定桶的数据将被分组到具有null范围的行中。
请注意,我使用“ bucket ”一词来表达按范围分组的含义,但这与 Hive 的“bucketing”(在尝试优化 Spark 上的连接时您可能会听到很多)没有联系。
| 归档时间: | 
 | 
| 查看次数: | 1543 次 | 
| 最近记录: |