JC4*_*417 5 scala apache-spark spark-structured-streaming
我基本上是使用Spark的文档中给出的示例以及内置的测试流,其中一个流领先3秒(最初使用kafka,但遇到了同样的问题)。结果正确返回匹配列,但是过一会儿返回相同的键,但外部为null。
这是预期的行为吗?有匹配项时,是否有办法排除重复的外部null结果?
码:
val testStream = session.readStream.format("rate")
.option("rowsPerSecond", "5").option("numPartitions", "1").load()
val impressions = testStream
.select(
(col("value") + 15).as("impressionAdId"),
col("timestamp").as("impressionTime"))
val clicks = testStream
.select(
col("value").as("clickAdId"),
col("timestamp").as("clickTime"))
// Apply watermarks on event-time columns
val impressionsWithWatermark =
impressions.withWatermark("impressionTime", "20 seconds")
val clicksWithWatermark =
clicks.withWatermark("clickTime", "30 seconds")
// Join with event-time constraints
val result = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 10 seconds
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter"
)
val query = result.writeStream.outputMode("update").format("console").option("truncate", false).start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
结果:
val testStream = session.readStream.format("rate")
.option("rowsPerSecond", "5").option("numPartitions", "1").load()
val impressions = testStream
.select(
(col("value") + 15).as("impressionAdId"),
col("timestamp").as("impressionTime"))
val clicks = testStream
.select(
col("value").as("clickAdId"),
col("timestamp").as("clickTime"))
// Apply watermarks on event-time columns
val impressionsWithWatermark =
impressions.withWatermark("impressionTime", "20 seconds")
val clicksWithWatermark =
clicks.withWatermark("clickTime", "30 seconds")
// Join with event-time constraints
val result = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 10 seconds
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter"
)
val query = result.writeStream.outputMode("update").format("console").option("truncate", false).start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
不幸的是,看起来您遇到了正确性问题SPARK-26154,该补丁可用,但审核有点拖沓。
鉴于补丁有点大,您可能不想尝试手动将补丁移植回您的版本。那么我认为你最好的选择是要求提交者尽快审查补丁,并要求将你使用的版本移植回版本行。