Har*_*nna 2 apache-spark spark-structured-streaming
我正在为 Spark 结构化流编写一个存储编写器,它将对给定的数据帧进行分区并写入不同的 Blob 存储帐户。Spark 文档说它确保了exactly once文件接收器的语义,但也说只有当源可重播并且接收器是幂等时才可能实现一次语义。
如果我以 parquet 格式写入,blob 存储是幂等接收器吗?
如果我这样做的话,行为会发生什么变化streamingDF.writestream.foreachbatch(...writing the DF here...).start()?它仍然能保证一次语义吗?
可能重复:如何获取结构化查询的 Kafka 偏移量以进行手动可靠的偏移量管理?
更新#1:类似 -
output
.writeStream
.foreachBatch((df: DataFrame, _: Long) => {
path = storagePaths(r.nextInt(3))
df.persist()
df.write.parquet(path)
df.unpersist()
})
Run Code Online (Sandbox Code Playgroud)
我假设问题是关于微批量流处理(而不是连续流处理)。
基于可用的和已提交的偏移量内部注册表(用于当前流执行,又名runId)以及常规检查点(以在重新启动时保留处理状态)来保证恰好一次语义。
仅当源可重播并且接收器是幂等的时,恰好一次语义才是可能的。
任何已经处理过但内部未正确记录的内容(见下文)都可以重新处理:
这意味着流查询中的所有流源都应该是可重新播放的,以允许轮询曾经请求的数据。
这也意味着接收器应该是幂等的,因此已成功处理并添加到接收器的数据可能会再次添加,因为在结构化流设法将数据(偏移量)记录为成功处理(在检查点)之前发生了故障
在处理任何流式源或读取器的可用数据(按偏移量)之前,MicroBatchExecution将偏移量提交到预写日志 (WAL) 并将以下 INFO 消息打印到日志中:
批次 [currentBatchId] 的提交偏移量。元数据 [offsetSeqMetadata]
仅当有新数据可用(基于偏移量)或最后一次执行需要另一个微批次进行状态管理时,才会执行流式查询(微批次)。
在addBatch阶段,MicroBatchExecution请求唯一Sink或StreamWriteSupport处理可用数据。
一旦微批次成功完成,就会MicroBatchExecution将可用偏移量提交到提交检查点,并且偏移量被视为已处理。
MicroBatchExecution将以下 DEBUG 消息打印到日志中:
已完成批次 [currentBatchId]
| 归档时间: |
|
| 查看次数: |
2646 次 |
| 最近记录: |