Mah*_*and 5 scala apache-spark parquet spark-structured-streaming
我已经使用完整模式在流数据帧上应用了聚合。为了在本地保存数据帧,我实现了接收foreach器。我能够以文本形式保存数据框。但我需要以 Parquet 格式保存它。
val writerForText = new ForeachWriter[Row] {
var fileWriter: FileWriter = _
override def process(value: Row): Unit = {
fileWriter.append(value.toSeq.mkString(","))
}
override def close(errorOrNull: Throwable): Unit = {
fileWriter.close()
}
override def open(partitionId: Long, version: Long): Boolean = {
FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
true
}
}
val columnName = "col1"
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName))
.writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start()
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?提前致谢!
为了将数据帧保存在本地,我实现了 foreach 接收器。我能够以文本形式保存数据框。但我需要将其保存为镶木地板格式。
保存流数据集时的默认格式是... parquet。话虽如此,您不必使用相当先进的foreach水槽,而只需使用parquet。
查询可以如下所示:
scala> :type in
org.apache.spark.sql.DataFrame
scala> in.isStreaming
res0: Boolean = true
in.writeStream.
option("checkpointLocation", "/tmp/checkpoint-so").
start("/tmp/parquets")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1046 次 |
| 最近记录: |