我有两个流数据集,我们将它们称为fastStream和slowStream。
这fastStream是我通过结构化流 API 从 Kafka 消费的流数据集。我预计每秒可能会收到数千条消息。
这slowStream实际上是一个引用(或查找)表,正在由另一个流“更新插入”,并且包含我想要在将fastStream记录保存到表之前加入到每条消息的数据。仅当有人更改元数据时才会slowStream更新,这可能随时发生,但我们预计可能每隔几天更改一次。
中的每条记录都fastStream将具有一条对应的消息slowStream,我本质上希望使该连接立即与表中的任何数据发生slowStream。我不想等到新数据到达时看看是否会发生潜在的匹配slowStream。
我遇到的问题是,根据 Spark 文档:
因此,对于这两个输入流,我们将过去的输入缓冲为流状态,以便我们可以将每个未来的输入与过去的输入进行匹配,并相应地生成连接结果。
我尝试添加水印,fastStream但我认为它没有效果,因为文档表明需要在连接中引用带水印的列
理想情况下我会写这样的东西:
# Apply a watermark to the fast stream
fastStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/fastStream") \
.withWatermark("timestamp", "1 hour") \
.alias("fastStream")
# The slowStream cannot be watermarked since it is only slowly changing
slowStream = spark.readStream \
.format("delta") \
.load("dbfs:/mnt/some_file/slowStream") \
.alias("slowStream")
# Prevent …Run Code Online (Sandbox Code Playgroud) apache-spark spark-streaming pyspark spark-structured-streaming