Bra*_*don 2 apache-spark pyspark spark-structured-streaming
我正在尝试编写一个 Spark 结构化流作业,该作业从 Kafka 主题读取并通过操作写入单独的路径(在执行一些转换之后)writeStream。但是,当我运行以下代码时,仅writeStream执行第一个代码,而第二个代码被忽略。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start() \
.awaitTermination()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start() \
.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
我最初认为我的问题与这篇文章有关,但是,将我的代码更改为以下内容后:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
write_one = df.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
.start()
// transform df to df2
write_two = df2.writeStream \
.foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
.start()
write_one.awaitTermination()
write_two.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Run Code Online (Sandbox Code Playgroud)
我不确定为什么start()和 之间的附加代码awaitTermination()会导致上面的错误(但我认为这可能是一个单独的问题,在上面同一篇文章的答案中引用)。writeStream在同一作业中调用多个操作的正确方法是什么?最好在调用的函数中进行两次写入foreachBatch,还是有更好的方法来实现这一点?
Spark 文档说,如果您需要执行写入多个位置的操作,则需要使用foreachBatch方法。
您的代码应该类似于:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
Run Code Online (Sandbox Code Playgroud)
注意:persist为了防止重新计算,需要这样做。
| 归档时间: |
|
| 查看次数: |
1645 次 |
| 最近记录: |