Spark在水槽中构建流式一致性

mat*_*ieu 7 apache-spark spark-streaming

在下面的例子中,我想更好地理解Spark 2.2结构化流的一致性模型:

  • 一个来源(Kinesis)
  • 从此源向2个不同的接收器发出2个查询:一个文件接收器用于存档目的(S3),另一个接收器用于处理数据(DB或文件,尚未确定)

我想了解是否在汇点之间存在任何一致性保证,至少在某些情况下:

  • 水槽中的一个可以领先于另一个水槽吗?或者他们在源上以相同的速度消耗数据(因为它是相同的源)?它们可以同步吗?
  • 如果我(优雅地)停止流应用程序,2接收器上的数据是否一致?

原因是我想构建一个类似Kappa的处理应用程序,能够在我想重新处理某些历史记录时暂停/关闭流媒体部分,并且当我恢复流式传输时,避免重新处理已经处理的内容(如在历史中),或遗漏了一些(例如,一些尚未提交到存档的数据,然后在流式恢复时已经处理的数据被跳过)

Sil*_*vio 10

要记住的一件重要事情是2个不同的查询将使用2个接收器,每个查询独立于源.因此,检查点是按查询完成的.

每当您调用start一个DataStreamWriter导致查询时,如果您设置checkpointLocation每个查询将有自己的检查点来跟踪接收器的偏移量.

val input = spark.readStream....

val query1 = input.select('colA, 'colB)
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir1")
  .start("/path1")

val query2 = input.select('colA, 'colB)
  .writeStream
  .format("csv")
  .option("checkpointLocation", "path/to/checkpoint/dir2")
  .start("/path2")
Run Code Online (Sandbox Code Playgroud)

因此,每个查询都是从源读取并独立跟踪偏移量.这也意味着,每个查询可以处于输入流的不同偏移量,您可以重新启动其中一个或两个,而不会影响另一个.

  • 谢谢@Silvio.顺便说一下,在你的示例中,我发现了一个关于检查点的棘手问题:如果通过`option()指定`一个需要手动拆分`dir1`和`dir2`,而如果全局指定`spark.sql.streaming.checkpointLocation `,附加后缀(`queryName`或UUID).ref [Spark code](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198) (2认同)