如何将完全输出模式下的流聚合保存到 Parquet?

Mah*_*and 5 scala apache-spark parquet spark-structured-streaming

我已经使用完整模式在流数据帧上应用了聚合。为了在本地保存数据帧,我实现了接收foreach器。我能够以文本形式保存数据框。但我需要以 Parquet 格式保存它。

val writerForText = new ForeachWriter[Row] {
    var fileWriter: FileWriter = _

    override def process(value: Row): Unit = {
      fileWriter.append(value.toSeq.mkString(","))
    }

    override def close(errorOrNull: Throwable): Unit = {
      fileWriter.close()
    }

    override def open(partitionId: Long, version: Long): Boolean = {
      FileUtils.forceMkdir(new File(s"src/test/resources/${partitionId}"))
      fileWriter = new FileWriter(new File(s"src/test/resources/${partitionId}/temp"))
      true

    }
  }

val columnName = "col1"
frame.select(count(columnName),count(columnName),min(columnName),mean(columnName),max(columnName),first(columnName), last(columnName), sum(columnName))
              .writeStream.outputMode(OutputMode.Complete()).foreach(writerForText).start()
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?提前致谢!

Jac*_*ski 0

为了将数据帧保存在本地,我实现了 foreach 接收器。我能够以文本形式保存数据框。但我需要将其保存为镶木地板格式。

保存流数据集时的默认格式是... parquet。话虽如此,您不必使用相当先进的foreach水槽,而只需使用parquet

查询可以如下所示:

scala> :type in
org.apache.spark.sql.DataFrame

scala> in.isStreaming
res0: Boolean = true

in.writeStream.
  option("checkpointLocation", "/tmp/checkpoint-so").
  start("/tmp/parquets")
Run Code Online (Sandbox Code Playgroud)

  • 结构化流不允许我们以完整模式将数据帧写入除内存之外的任何接收器。为了保存它,我们必须实现 foreach 接收器。我们无法按照您建议的完整模式执行此操作。 (2认同)
  • 如果您尝试执行此操作,您会收到 IllegalArgumentException,指出“数据源 parquet 不支持完整输出模式”。 (2认同)