Scala根据时间列将单行拆分为多行

too*_*lik 3 scala apache-spark

我有以下格式的数据框:

|u_name|Date        |Hour |  Content_id|WatchTime(sec)   |
|user1 | 2019-07-28 |  21 |        100 |           10800 |
|user2 | 2019-07-28 |  20 |        101 |            3600 | 
|user3 | 2019-07-28 |  21 |        202 |            7000 | 
Run Code Online (Sandbox Code Playgroud)

我需要将此数据帧转换为以下数据,基本上,我需要每小时创建一个条目,因此,如果WatchTime(sec)超过3600秒,则需要在下一个小时创建一个新条目

|u_name|Date        |Hour |  Content_id|WatchTime(sec)   |
|user1 | 2019-07-28 |  21 |        100 |            3600 |
|user1 | 2019-07-28 |  22 |        100 |            3600 |
|user1 | 2019-07-28 |  23 |        100 |            3600 |
|user2 | 2019-07-28 |  20 |        101 |            3600 | 
|user3 | 2019-07-28 |  21 |        202 |            3600 | 
|user3 | 2019-07-28 |  22 |        202 |            3400 |
Run Code Online (Sandbox Code Playgroud)

可以使用SQL以某种方式实现此目的,但是我正在使用Scala,什么是实现此目的的有效方法。

rlu*_*uta 5

您可以通过以下转换在spark 2.4+中实现此目的:

  • 使用sequence高阶函数将WatchTime分成3600秒的数组
  • 分解数组以生成新行
  • 调整每一行的小时和监视时间值
  • 删除所有WatchTime为零的行
val result = df
   .withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
   .withColumn("offset", explode('stamps))
   .withColumn("Hour", 'Hour + ('offset/3600).cast("int"))
   .withColumn("WatchTime", 'WatchTime - 'offset)
   .withColumn("WatchTime", when('WatchTime <= 3600, 'WatchTime).otherwise(3600))
   .filter('WatchTime > 0)
   .drop("stamps","offset")

result.show()
+------+-------------------+----+----------+---------+
|u_name|               Date|Hour|Content_id|WatchTime|
+------+-------------------+----+----------+---------+
| user1|2019-07-28 00:00:00|  21|       100|     3600|
| user1|2019-07-28 00:00:00|  22|       100|     3600|
| user1|2019-07-28 00:00:00|  23|       100|     3600|
| user2|2019-07-28 00:00:00|  20|       101|     3600|
| user3|2019-07-28 00:00:00|  21|       202|     3600|
| user3|2019-07-28 00:00:00|  22|       202|     3400|
+------+-------------------+----+----------+---------+
Run Code Online (Sandbox Code Playgroud)

该算法可能会产生比23更高的小时数。如果您需要准确的日期和小时信息,建议您使用单个unix时间戳列结合开始日期和小时,因为它可以让您进行时间处理并正确转换为日期和小时需要的时候。

它看起来像这样:

val result = df
   .withColumn("StartDateTime", unix_timestamp('Date) + ('Hour * 3600 ))
   .withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))  
   .withColumn("offset", explode('stamps))
   .withColumn("StartDateTime", from_unixtime('StartDateTime + 'offset))
   .withColumn("WatchTime", when('WatchTime - 'offset>3600,3600).otherwise('WatchTime - 'offset))
   .filter('WatchTime > 0)
   .select('u_name, 'content_id, 'StartDateTime, 'WatchTime)

result.show
+------+----------+-------------------+---------+
|u_name|content_id|      StartDateTime|WatchTime|
+------+----------+-------------------+---------+
| user1|       100|2019-07-28 21:00:00|     3600|
| user1|       100|2019-07-28 22:00:00|     3600|
| user1|       100|2019-07-28 23:00:00|     3600|
| user2|       101|2019-07-28 20:00:00|     3600|
| user3|       202|2019-07-28 21:00:00|     3600|
| user3|       202|2019-07-28 22:00:00|     3400|
+------+----------+-------------------+---------+
Run Code Online (Sandbox Code Playgroud)