too*_*lik 3 scala apache-spark
我有以下格式的数据框:
|u_name|Date |Hour | Content_id|WatchTime(sec) |
|user1 | 2019-07-28 | 21 | 100 | 10800 |
|user2 | 2019-07-28 | 20 | 101 | 3600 |
|user3 | 2019-07-28 | 21 | 202 | 7000 |
Run Code Online (Sandbox Code Playgroud)
我需要将此数据帧转换为以下数据,基本上,我需要每小时创建一个条目,因此,如果WatchTime(sec)
超过3600秒,则需要在下一个小时创建一个新条目
|u_name|Date |Hour | Content_id|WatchTime(sec) |
|user1 | 2019-07-28 | 21 | 100 | 3600 |
|user1 | 2019-07-28 | 22 | 100 | 3600 |
|user1 | 2019-07-28 | 23 | 100 | 3600 |
|user2 | 2019-07-28 | 20 | 101 | 3600 |
|user3 | 2019-07-28 | 21 | 202 | 3600 |
|user3 | 2019-07-28 | 22 | 202 | 3400 |
Run Code Online (Sandbox Code Playgroud)
可以使用SQL以某种方式实现此目的,但是我正在使用Scala,什么是实现此目的的有效方法。
您可以通过以下转换在spark 2.4+中实现此目的:
sequence
高阶函数将WatchTime分成3600秒的数组val result = df
.withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
.withColumn("offset", explode('stamps))
.withColumn("Hour", 'Hour + ('offset/3600).cast("int"))
.withColumn("WatchTime", 'WatchTime - 'offset)
.withColumn("WatchTime", when('WatchTime <= 3600, 'WatchTime).otherwise(3600))
.filter('WatchTime > 0)
.drop("stamps","offset")
result.show()
+------+-------------------+----+----------+---------+
|u_name| Date|Hour|Content_id|WatchTime|
+------+-------------------+----+----------+---------+
| user1|2019-07-28 00:00:00| 21| 100| 3600|
| user1|2019-07-28 00:00:00| 22| 100| 3600|
| user1|2019-07-28 00:00:00| 23| 100| 3600|
| user2|2019-07-28 00:00:00| 20| 101| 3600|
| user3|2019-07-28 00:00:00| 21| 202| 3600|
| user3|2019-07-28 00:00:00| 22| 202| 3400|
+------+-------------------+----+----------+---------+
Run Code Online (Sandbox Code Playgroud)
该算法可能会产生比23更高的小时数。如果您需要准确的日期和小时信息,建议您使用单个unix时间戳列结合开始日期和小时,因为它可以让您进行时间处理并正确转换为日期和小时需要的时候。
它看起来像这样:
val result = df
.withColumn("StartDateTime", unix_timestamp('Date) + ('Hour * 3600 ))
.withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
.withColumn("offset", explode('stamps))
.withColumn("StartDateTime", from_unixtime('StartDateTime + 'offset))
.withColumn("WatchTime", when('WatchTime - 'offset>3600,3600).otherwise('WatchTime - 'offset))
.filter('WatchTime > 0)
.select('u_name, 'content_id, 'StartDateTime, 'WatchTime)
result.show
+------+----------+-------------------+---------+
|u_name|content_id| StartDateTime|WatchTime|
+------+----------+-------------------+---------+
| user1| 100|2019-07-28 21:00:00| 3600|
| user1| 100|2019-07-28 22:00:00| 3600|
| user1| 100|2019-07-28 23:00:00| 3600|
| user2| 101|2019-07-28 20:00:00| 3600|
| user3| 202|2019-07-28 21:00:00| 3600|
| user3| 202|2019-07-28 22:00:00| 3400|
+------+----------+-------------------+---------+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
78 次 |
最近记录: |