将Spark 2.3.1与Scala结合使用,将日期范围的任意列表缩小为不同的日期范围

Jer*_*emy 5 scala user-defined-functions apache-spark apache-spark-sql

给定日期范围列表,其中一些重叠:

val df = Seq(
  ("Mike","2018-09-01","2018-09-10"), // range 1
  ("Mike","2018-09-05","2018-09-05"), // range 1
  ("Mike","2018-09-12","2018-09-12"), // range 1
  ("Mike","2018-09-11","2018-09-11"), // range 1
  ("Mike","2018-09-25","2018-09-29"), // range 4
  ("Mike","2018-09-21","2018-09-23"), // range 4
  ("Mike","2018-09-24","2018-09-24"), // range 4
  ("Mike","2018-09-14","2018-09-16"), // range 2
  ("Mike","2018-09-15","2018-09-17"), // range 2
  ("Mike","2018-09-05","2018-09-05"), // range 1
  ("Mike","2018-09-19","2018-09-19"), // range 3
  ("Mike","2018-09-19","2018-09-19"), // range 3
  ("Mike","2018-08-19","2018-08-20"), // range 5
  ("Mike","2018-10-01","2018-10-20"), // range 6
  ("Mike","2018-10-10","2018-10-30")  // range 6
).toDF("name", "start", "end")
Run Code Online (Sandbox Code Playgroud)

我想将数据减少到最小日期范围集合,以完全封装上述日期,而不添加任何额外的日期:

+----+----------+----------+                                                    
|name|start     |end       |
+----+----------+----------+
|Mike|2018-09-01|2018-09-12|
|Mike|2018-09-14|2018-09-17|
|Mike|2018-09-19|2018-09-19|
|Mike|2018-09-21|2018-09-29|
|Mike|2018-08-19|2018-08-20|
|Mike|2018-10-01|2018-10-30|
+----+----------+----------+
Run Code Online (Sandbox Code Playgroud)

编辑:向测试数据中添加了三个新条目,以说明新的边缘情况。

我不能指望日期按任何特定顺序排列。

到目前为止,我最大的尝试是:

  1. 将每个日期范围分解为一组单独的日期
  2. 将集合全天组合成一个大集合
  3. 将集合排序到列表中,以便按顺序排列日期
  4. 将各天汇总回到天列表中。
  5. 将每个列表的第一天和最后一天作为新范围。

代码如下:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import scala.collection.immutable.NumericRange
import java.time.LocalDate

case class MyRange(start:String, end:String)

val combineRanges = udf((ranges: Seq[Row]) => {
  ranges.map(r => LocalDate.parse(r(0).toString).toEpochDay to LocalDate.parse(r(1).toString).toEpochDay)
    .map(_.toIndexedSeq).reduce(_ ++ _).distinct.toList.sorted
    .aggregate(List.empty[Vector[Long]])((ranges:List[Vector[Long]], d:Long) => {
    ranges.lastOption.find(_.last + 1 == d) match {
      case Some(r:Vector[Long]) => ranges.dropRight(1) :+ (r :+ d)
      case None => ranges :+ Vector(d)
    }
  }, _ ++ _).map(v => MyRange(LocalDate.ofEpochDay(v.head).toString, LocalDate.ofEpochDay(v.last).toString))
})

df.groupBy("name")
  .agg(combineRanges(collect_list(struct($"start", $"end"))) as "ranges")
  .withColumn("ranges", explode($"ranges"))
  .select($"name", $"ranges.start", $"ranges.end")
  .show(false)
Run Code Online (Sandbox Code Playgroud)

它似乎可以工作,但是它非常丑陋,可能浪费时间和内存。

我有点希望使用scala Range类仅将概念上的日期范围爆炸成它们各自的日子,但是我感觉到排序操作迫使scala伸出援手,并使其实际上创建了内存中所有日期的列表。

有人有更好的方法吗?

the*_*tom 1

这是 DF 和 SPARK SQL 的替代方案,根据定义既可以是非过程性的,也可以是过程性的。你需要好好读书并坚持下去。

// Aspects such as caching and re-partitioning for performance not considered. On the other hand it all happens under the bonnet wth DF's - so they say.
// Functional only.
import org.apache.spark.sql.functions._
import spark.implicits._
import java.time._
import org.apache.spark.sql.functions.{lead, lag}
import org.apache.spark.sql.expressions.Window

def toEpochDay(s: String) = LocalDate.parse(s).toEpochDay
val toEpochDayUdf = udf(toEpochDay(_: String))

val df = Seq(
("Betty","2018-09-05","2018-09-05"),  ("Betty","2018-09-05","2018-09-05"), 
("Betty","2018-09-05","2018-09-08"),  ("Betty","2018-09-07","2018-09-10"),  
("Betty","2018-09-07","2018-09-08"),  ("Betty","2018-09-06","2018-09-07"),  
("Betty","2018-09-10","2018-09-15"),  ("Betty","2017-09-10","2017-09-15"),
("XXX","2017-09-04","2017-09-10"),    ("XXX","2017-09-10","2017-09-15"),
("YYY","2017-09-04","2017-09-10"),    ("YYY","2017-09-11","2017-09-15"),
("Bob","2018-09-01","2018-09-02"),    ("Bob","2018-09-04","2018-09-05"),  
("Bob","2018-09-06","2018-09-07"),    ("Bob","2019-09-04","2019-09-05"),  
("Bob","2019-09-06","2019-09-07"),    ("Bob","2018-09-08","2018-09-22")   
           ).toDF("name", "start", "end")

// Remove any duplicates - pointless to n-process these!
val df2 = df.withColumn("s", toEpochDayUdf($"start")).withColumn("e", toEpochDayUdf($"end")).distinct  
df2.show(false) // The original input
df2.createOrReplaceTempView("ranges")

// Find those records encompassed by a broader time frame and hence not required for processing.
val q = spark.sql("""  SELECT * 
                         FROM ranges r1
                        WHERE EXISTS (SELECT r2.name                        
                                        FROM ranges r2
                                       WHERE r2.name = r1.name 
                                         AND ((r1.s >= r2.s AND r1.e < r2.e) OR 
                                              (r1.e <= r2.e AND r1.s > 2.s))
                                     ) 
                  """)   
//q.show(false)

val df3 = df2.except(q) // Overlapping or on their own / single range records left.
//df3.show(false)
df3.createOrReplaceTempView("ranges2")

// Find those ranges that have a gap between them and the next adjacent records, before or after, i.e. records that exist on their own and are in fact per de facto the first part of the answer.
val q2 = spark.sql("""  SELECT * 
                         FROM ranges2 r1
                        WHERE NOT EXISTS (SELECT r2.name                        
                                            FROM ranges2 r2
                                           WHERE r2.name = r1.name 
                                             AND (r2.e >= r1.s - 1 AND r2.s <= r1.s - 1 ) OR
                                                 (r2.s <= r1.e + 1 AND r2.e >= r1.e + 1 )) 
                                          ) 
                   """)

// Store the first set of records that exist on their own with some form of gap, first part of result overall result set.                                                    
val result1 = q2.select("name", "start", "end")
result1.show(false) 

// Get the records / ranges that have overlaps to process - the second remaining set of such records to process.
val df4 = df3.except(q2) 
//df4.show(false)

//Avoid Serialization errors with lag!
@transient val w = org.apache.spark.sql.expressions.Window.partitionBy("name").orderBy("e")
@transient val lag_y = lag("e", 1, -99999999).over(w)
//df.select(lag_y).map(f _).first
val df5 = df4.withColumn("new_col", lag_y)
//df5.show(false)

// Massage data to get results via easier queries, e.g. avoid issues with correlated sub-queries.
val myExpression = "s - new_col"
val df6 = df5.withColumn("result", when($"new_col" === 0, 0).otherwise(expr(myExpression)))
//df6.show(false)
df6.createOrReplaceTempView("ranges3")

val q3 = spark.sql("""  SELECT *, dense_rank() over (PARTITION BY name ORDER BY start ASC) as RANK
                          FROM ranges3
                          WHERE new_col = -99999999 OR result > 1
                   """)
q3.createOrReplaceTempView("rangesSTARTS")

val q4 = spark.sql("""  SELECT *
                          FROM ranges3
                         WHERE result <= 1 AND new_col <> -99999999 
                   """)
q4.createOrReplaceTempView("rangesFOLLOWERS")

val q5 = spark.sql("""  SELECT r1.*, r2.start as next_start
                          FROM rangesSTARTS r1 LEFT JOIN rangesSTARTS r2
                           ON r2.name = r1.name 
                          AND r2.RANK = r1.RANK + 1 
                   """)
//q5.show(false)

val q6 = q5.withColumn("end_period", when($"next_start".isNull, "2525-01-01").otherwise($"next_start"))
//q6.show(false)
q6.createOrReplaceTempView("rangesSTARTS2")

// Second and final set of results - the head and tail of such set of range records.
val result2 = spark.sql("""  SELECT r1.name, r1.start, MAX(r2.end) as end
                               FROM rangesFOLLOWERS r2, rangesSTARTS2 r1
                              WHERE r2.name = r1.name
                                AND r2.end >= r1.start 
                                AND r2.end <  r1.end_period
                           GROUP BY r1.name, r1.start """)   
result2.show(false)

val finalresult = result1.union(result2)
finalresult.show
Run Code Online (Sandbox Code Playgroud)

返回:

+-----+----------+----------+
| name|     start|       end|
+-----+----------+----------+
|  Bob|2018-09-01|2018-09-02|
|Betty|2017-09-10|2017-09-15|
|  YYY|2017-09-04|2017-09-15|
|  Bob|2018-09-04|2018-09-22|
|  Bob|2019-09-04|2019-09-07|
|  XXX|2017-09-04|2017-09-15|
|Betty|2018-09-05|2018-09-15|
+-----+----------+----------+
Run Code Online (Sandbox Code Playgroud)

有趣的对比 - 性能和风格哪个更好?我有一段时间最后一次这样的努力了。对您的评论感兴趣。你比我更了解编程方面,所以这个问题提供了一些很好的比较和一些很好的教育。其他解决方案确实会爆炸,而不是我所看到的。