Alo*_*lon 7 join bigdata apache-spark spark-structured-streaming
我正在阅读结构化流文档。
一方面,如果我做对了,在“处理多个水印的策略”下,他们说如果两个流上的水印不同,那么Spark将为它们两个使用最小值(默认)或最大值(如果您明确指定它)作为全局水印(因此Spark将忽略另一个水印)。
另一方面,在带有可选水印的内部联接下,他们有两个带有不同水印的流的示例,并且他们说,对于每个流,将使用指定的水印(而不是仅将最小的或最大的一个用作两个的全局水印)。 )。
也许我不理解他们在处理多个水印的策略下真正尝试解释的内容,因为他们说如果将设置为multipleWatermarkPolicy,max则全局水印将以最快的速度移动,但这应该是完全相反的,因为水印越大,表示流越慢。
如果据我了解,您想知道多个水印在联接操作中的行为,对吗?我这样,我做了一些深入的实现以找到答案。
spark.sql.streaming.multipleWatermarkPolicy该属性全局用于涉及多个水印的所有操作,其默认值为min。您可以查看WatermarkTracker#updateWatermark(executedPlan: SparkPlan)调用的方法来弄清楚MicroBatchExecution#runBatch。然后调用runBatch,该类是org.apache.spark.sql.execution.streaming.StreamExecution#runStream负责...流执行的类;)
updateWatermark实作updateWatermark 首先从物理计划中收集所有事件时间水印节点:
val watermarkOperators = executedPlan.collect {
case e: EventTimeWatermarkExec => e
}
if (watermarkOperators.isEmpty) return
watermarkOperators.zipWithIndex.foreach {
case (e, index) if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
val prevWatermarkMs = operatorToWatermarkMap.get(index)
if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
operatorToWatermarkMap.put(index, newWatermarkMs)
}
// Populate 0 if we haven't seen any data yet for this watermark node.
case (_, index) =>
if (!operatorToWatermarkMap.isDefinedAt(index)) {
operatorToWatermarkMap.put(index, 0)
}
}
Run Code Online (Sandbox Code Playgroud)
为了获得一个想法,流对流连接的物理计划可能如下所示:
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6a1dff1d
+- StreamingSymmetricHashJoin [mainKey#10730], [joinedKey#10733], Inner, condition = [ leftOnly = null, rightOnly = null, both = (mainEventTimeWatermark#10732-T4000ms >= joinedEventTimeWatermark#10735-T8000ms), full = (mainEventTimeWatermark#10732-T4000ms >= joinedEventTimeWatermark#10735-T8000ms) ], state info [ checkpoint = file:/tmp/temporary-3416be37-81b4-471a-b2ca-9b8f8593843a/state, runId = 17a4e028-29cb-41b0-b34b-44e20409b335, opId = 0, ver = 13, numPartitions = 200], 389000, state cleanup [ left value predicate: (mainEventTimeWatermark#10732-T4000ms <= 388999000), right = null ]
:- Exchange hashpartitioning(mainKey#10730, 200)
: +- *(2) Filter isnotnull(mainEventTimeWatermark#10732-T4000ms)
: +- EventTimeWatermark mainEventTimeWatermark#10732: timestamp, interval 4 seconds
: +- *(1) Filter isnotnull(mainKey#10730)
: +- *(1) Project [mainKey#10730, mainEventTime#10731L, mainEventTimeWatermark#10732]
: +- *(1) ScanV2 MemoryStreamDataSource$[mainKey#10730, mainEventTime#10731L, mainEventTimeWatermark#10732]
+- Exchange hashpartitioning(joinedKey#10733, 200)
+- *(4) Filter isnotnull(joinedEventTimeWatermark#10735-T8000ms)
+- EventTimeWatermark joinedEventTimeWatermark#10735: timestamp, interval 8 seconds
+- *(3) Filter isnotnull(joinedKey#10733)
+- *(3) Project [joinedKey#10733, joinedEventTime#10734L, joinedEventTimeWatermark#10735]
+- *(3) ScanV2 MemoryStreamDataSource$[joinedKey#10733, joinedEventTime#10734L, joinedEventTimeWatermark#10735]
Run Code Online (Sandbox Code Playgroud)
以后,updateWatermark使用一种可用的水印策略,这些策略是MinWatermark和MaxWatermark,具体取决于您在中设置的值spark.sql.streaming.multipleWatermarkPolicy。在MultipleWatermarkPolicy伴随对象中已通过以下方式解决:
def apply(policyName: String): MultipleWatermarkPolicy = {
policyName.toLowerCase match {
case DEFAULT_POLICY_NAME => MinWatermark
case "max" => MaxWatermark
case _ =>
throw new IllegalArgumentException(s"Could not recognize watermark policy '$policyName'")
}
}
Run Code Online (Sandbox Code Playgroud)
updateWatermark 使用已解决的策略来计算水印以应用于查询:
// Update the global watermark to the minimum of all watermark nodes.
// This is the safest option, because only the global watermark is fault-tolerant. Making
// it the minimum of all individual watermarks guarantees it will never advance past where
// any individual watermark operator would be if it were in a plan by itself.
val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq)
if (chosenGlobalWatermark > globalWatermarkMs) {
logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")
globalWatermarkMs = chosenGlobalWatermark
} else {
logDebug(s"Event time watermark didn't move: $chosenGlobalWatermark < $globalWatermarkMs")
}
Run Code Online (Sandbox Code Playgroud)
但是,我同意上一片段中的注释有点误导,因为它说的是“将全局水印更新为所有水印节点中的最小值”。(https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala#L109)
EventTimeWatermarkSuite中也声明了多个水印上的行为。即使它适用于UNION,您也可以在前两个部分看到,对于所有组合操作,水印的更新方式都相同。
要自己调试,请检查日志中的以下条目:
[2019-07-05 08:30:09,729] org.apache.spark.internal.Logging$class INFO Streaming query made progress-返回有关每个已执行查询的所有信息。在它的eventTime一部分,你会发现watermark,应该是不同的属性,如果你用的最小和最大multipleWatermarkPolicy执行相同的查询[2019-07-05 08:30:35,685] org.apache.spark.internal.Logging$class INFO Updating event-time watermark from 0 to 6000 ms (org.apache.spark.sql.execution.streaming.WatermarkTracker:54)-表示水印已更改。如前所述,根据最小/最大属性应有所不同。因此,要总结一下,从2.4.0开始,我们可以选择一个水印(最小或最大)。在2.4.0之前,最小水印是默认选择(SPARK-24730)。因此,独立于操作类型(内部联接,外部联接等),因为所有查询的水印解析方法均相同。
| 归档时间: |
|
| 查看次数: |
529 次 |
| 最近记录: |