带有 Spark 流的多个 writeStream

sca*_*ode 1 apache-spark spark-structured-streaming

我正在使用 Spark Streaming,在尝试实现多个写入流时遇到一些问题。下面是我的代码

DataWriter.writeStreamer(firstTableData,"parquet",CheckPointConf.firstCheckPoint,OutputConf.firstDataOutput)
DataWriter.writeStreamer(secondTableData,"parquet",CheckPointConf.secondCheckPoint,OutputConf.secondDataOutput)
DataWriter.writeStreamer(thirdTableData,"parquet", CheckPointConf.thirdCheckPoint,OutputConf.thirdDataOutput)
Run Code Online (Sandbox Code Playgroud)

其中 writeStreamer 定义如下:

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String) = {

  val query = input
                .writeStream
                .format("orc")
                .option("checkpointLocation", checkPointFolder)
                .option("path", output)
                .outputMode(OutputMode.Append)
                .start()

  query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)

我面临的问题是只有第一个表是用 Spark writeStream 写入的,所有其他表都没有发生任何情况。请问您对此有什么想法吗?

bp2*_*010 5

query.awaitTermination()应该在创建最后一个流 之后完成。

writeStreamer可以修改函数以StreamingQuery在该点返回 a 而不是 waitTermination (因为它是阻塞的):

def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
  input
    .writeStream
    .format("orc")
    .option("checkpointLocation", checkPointFolder)
    .option("path", output)
    .outputMode(OutputMode.Append)
    .start()
}
Run Code Online (Sandbox Code Playgroud)

那么你将拥有:

val query1 = DataWriter.writeStreamer(...)
val query2 = DataWriter.writeStreamer(...)
val query3 = DataWriter.writeStreamer(...)

query3.awaitTermination()
Run Code Online (Sandbox Code Playgroud)