Iva*_*ali 5 python sorting apache-spark pyspark
我有一个使用Spark似乎相对简单的用例,但似乎无法找到一个确定的方法来做到这一点.
我有一个数据集,其中包含各种用户的时间序列数据.我要做的就是:
我尝试使用以下代码片段,但最终得到了令人惊讶的结果.我最终得到每个用户ID 1个csv文件,一些用户的时间序列数据最终得到排序,但很多其他用户都没有排序.
# repr(ds) = DataFrame[userId: string, timestamp: string, c1: float, c2: float, c3: float, ...]
ds = load_dataset(user_dataset_path)
ds.repartition("userId")
.sortWithinPartitions("timestamp")
.write
.partitionBy("userId")
.option("header", "true")
.csv(output_path)
Run Code Online (Sandbox Code Playgroud)
我不清楚为什么会发生这种情况,我不完全确定如何做到这一点.我也不确定这是否可能是Spark中的一个错误.
我正在使用Spark 2.0.2和Python 2.7.12.任何建议将非常感谢!
以下代码适用于我(此处以 Scala 所示,但与 Python 类似)。
我为每个用户名获取一个文件,输出文件中的行按时间戳值排序。
testDF
.select( $"username", $"timestamp", $"activity" )
.repartition(col("username"))
.sortWithinPartitions(col("username"),col("timestamp")) // <-- both here
.write
.partitionBy("username")
.mode(SaveMode.Overwrite)
.option("header", "true")
.option("delimiter", ",")
.csv(folder + "/useractivity")
Run Code Online (Sandbox Code Playgroud)
重要的是将用户名和时间戳列作为sortWithinPartitions的参数。
以下是其中一个输出文件的外观(我使用一个简单的整数作为时间戳):
timestamp,activity
345,login
402,upload
515,download
600,logout
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1009 次 |
| 最近记录: |