Ray*_*y J 7 apache-kafka apache-spark spark-structured-streaming
编程指南说结构化流媒体使用适当的源/接收器确保端到端的语义.
但是,当作业崩溃并且我们应用了水印时,我不理解它是如何工作的.
下面是我目前想象它如何工作的一个例子,请纠正我在任何我误解的点.提前致谢!
例:
Spark Job:每1小时窗口计数#个事件,1个小时的水印.
消息:
我们开始工作,从源头读取A,B,C,并在我们将它们写入我们的接收器之前的上午10:30崩溃.
下午6点,作业恢复正常,并且知道使用保存的检查点/ WAL重新处理A,B,C.10-11am窗口的最终计数为3.
接下来,它并行读取来自Kafka,X,Y,Z的新消息,因为它们属于不同的分区.首先处理Z,因此最大事件时间戳设置为晚上8点.当作业读取X和Y时,它们现在位于水印之后(晚上8点 - 1小时=晚上7点),因此它们将作为旧数据丢弃.最终计数是1到8-9pm,并且该作业没有报告12-1pm窗口的任何内容.我们丢失了X和Y的数据.
---结束例子---
这种情况准确吗?如果是这样,1小时水印可能足以处理从Kafka-Sspark正常流动时的延迟/无序数据,但是当火花作业停止/ Kafka连接长时间丢失时则不会.避免数据丢失的唯一选择是使用比您预期的工作更长的水印吗?
在miniatch期间,水印是固定值.在您的示例中,由于X,Y和Z在同一个小批处理中处理,因此用于此记录的水印将是上午9:20.完成该小批量水印后,将更新至晚上7点.
低于SPARK-18124功能设计文档的引用,该功能实现了水印功能:
要在基于触发器的执行中计算下降边界,我们必须执行以下操作.
- 在每个触发器中,在聚合数据的同时,我们还会在触发器数据中扫描事件时间的最大值
- 触发完成后,计算watermark = MAX(触发前的事件时间,触发时的最大事件时间) - 阈值
可能模拟会更多描述:
import org.apache.hadoop.fs.Path
import java.sql.Timestamp
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime
val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)
val schema = StructType(StructField("vilue", StringType) ::
StructField("timestamp", TimestampType) ::
Nil)
val eventStream = spark
.readStream
.option("sep", ";")
.option("header", "false")
.schema(schema)
.csv(dir.toString)
// Watermarked aggregation
val eventsCount = eventStream
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.count
def writeFile(path: Path, data: String) {
val file = fs.create(path)
file.writeUTF(data)
file.close()
}
// Debug query
val query = eventsCount.writeStream
.format("console")
.outputMode("complete")
.option("truncate", "false")
.trigger(ProcessingTime("5 seconds"))
.start()
writeFile(new Path(dir, "file1"), """
|A;2017-08-09 10:00:00
|B;2017-08-09 10:10:00
|C;2017-08-09 10:20:00""".stripMargin)
query.processAllAvailable()
val lp1 = query.lastProgress
// -------------------------------------------
// Batch: 0
// -------------------------------------------
// +---------------------------------------------+-----+
// |window |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
// +---------------------------------------------+-----+
// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
// ...
// "numInputRows" : 3,
// "eventTime" : {
// "avg" : "2017-08-09T10:10:00.000Z",
// "max" : "2017-08-09T10:20:00.000Z",
// "min" : "2017-08-09T10:00:00.000Z",
// "watermark" : "1970-01-01T00:00:00.000Z"
// },
// ...
// }
writeFile(new Path(dir, "file2"), """
|Z;2017-08-09 20:00:00
|X;2017-08-09 12:00:00
|Y;2017-08-09 12:50:00""".stripMargin)
query.processAllAvailable()
val lp2 = query.lastProgress
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +---------------------------------------------+-----+
// |window |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 |
// +---------------------------------------------+-----+
// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
// ...
// "numInputRows" : 3,
// "eventTime" : {
// "avg" : "2017-08-09T14:56:40.000Z",
// "max" : "2017-08-09T20:00:00.000Z",
// "min" : "2017-08-09T12:00:00.000Z",
// "watermark" : "2017-08-09T09:20:00.000Z"
// },
// "stateOperators" : [ {
// "numRowsTotal" : 3,
// "numRowsUpdated" : 2
// } ],
// ...
// }
writeFile(new Path(dir, "file3"), "")
query.processAllAvailable()
val lp3 = query.lastProgress
// -------------------------------------------
// Batch: 2
// -------------------------------------------
// +---------------------------------------------+-----+
// |window |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3 |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2 |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1 |
// +---------------------------------------------+-----+
// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
// ...
// "numInputRows" : 0,
// "eventTime" : {
// "watermark" : "2017-08-09T19:00:00.000Z"
// },
// "stateOperators" : [ ],
// ...
// }
query.stop()
fs.delete(dir, true)
Run Code Online (Sandbox Code Playgroud)
注意批处理0以水印开始时批处理0如何1970-01-01 00:00:00
启动水印2017-08-09 09:20:00
(批处理0的最大事件时间减去1小时).批次2,虽然为空,使用水印2017-08-09 19:00:00
.