如何在单个 Spark 作业中调用多个 writeStream 操作?

Bra*_*don 2 apache-spark pyspark spark-structured-streaming

我正在尝试编写一个 Spark 结构化流作业,该作业从 Kafka 主题读取并通过操作写入单独的路径(在执行一些转换之后)writeStream。但是,当我运行以下代码时,仅writeStream执行第一个代码,而第二个代码被忽略。

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() \
  .awaitTermination()

// transform df to df2

write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start() \
  .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我最初认为我的问题与这篇文章有关,但是,将我的代码更改为以下内容后:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() 

// transform df to df2 
 
write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start()

write_one.awaitTermination()
write_two.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Run Code Online (Sandbox Code Playgroud)

我不确定为什么start()和 之间的附加代码awaitTermination()会导致上面的错误(但我认为这可能是一个单独的问题,在上面同一篇文章的答案中引用)。writeStream在同一作业中调用多个操作的正确方法是什么?最好在调用的函数中进行两次写入foreachBatch,还是有更好的方法来实现这一点?

Ale*_*s R 5

Spark 文档说,如果您需要执行写入多个位置的操作,则需要使用foreachBatch方法。

您的代码应该类似于:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}
Run Code Online (Sandbox Code Playgroud)

注意:persist为了防止重新计算,需要这样做。

您可以查看更多信息:http://spark.apache.org/docs/latest/structed-streaming-programming-guide.html#using-foreach-and-foreachbatch