结构化流如何确保文件接收器的一次性写入语义?

Har*_*nna 2 apache-spark spark-structured-streaming

我正在为 Spark 结构化流编写一个存储编写器,它将对给定的数据帧进行分区并写入不同的 Blob 存储帐户。Spark 文档说它确保了exactly once文件接收器的语义,但也说只有当源可重播并且接收器是幂等时才可能实现一次语义。

  1. 如果我以 parquet 格式写入,blob 存储是幂等接收器吗?

  2. 如果我这样做的话,行为会发生什么变化streamingDF.writestream.foreachbatch(...writing the DF here...).start()?它仍然能保证一次语义吗?

可能重复:如何获取结构化查询的 Kafka 偏移量以进行手动可靠的偏移量管理?

更新#1:类似 -

output
      .writeStream
      .foreachBatch((df: DataFrame, _: Long) => {
        path = storagePaths(r.nextInt(3))

        df.persist()
        df.write.parquet(path)
        df.unpersist()
      })
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 5

微批流处理

我假设问题是关于微批量流处理(而不是连续流处理)。

基于可用的和已提交的偏移量内部注册表(用于当前流执行,又名runId)以及常规检查点(以在重新启动时保留处理状态)来保证恰好一次语义。

仅当源可重播并且接收器是幂等的时,恰好一次语义才是可能的。

任何已经处理过但内部未正确记录的内容(见下文)都可以重新处理:

  • 这意味着流查询中的所有流源都应该是可重新播放的,以允许轮询曾经请求的数据。

  • 这也意味着接收器应该是幂等的,因此已成功处理并添加到接收器的数据可能会再次添加,因为在结构化流设法将数据(偏移量)记录为成功处理(在检查点)之前发生了故障

内部结构

在处理任何流式源或读取器的可用数据(按偏移量)之前,MicroBatchExecution将偏移量提交到预写日志 (WAL) 并将以下 INFO 消息打印到日志中:

批次 [currentBatchId] 的提交偏移量。元数据 [offsetSeqMetadata]

仅当有新数据可用(基于偏移量)或最后一次执行需要另一个微批次进行状态管理时,才会执行流式查询(微批次)。

addBatch阶段,MicroBatchExecution请求唯一SinkStreamWriteSupport处理可用数据。

一旦微批次成功完成,就会MicroBatchExecution将可用偏移量提交到提交检查点,并且偏移量被视为已处理。

MicroBatchExecution将以下 DEBUG 消息打印到日志中:

已完成批次 [currentBatchId]