Kafka 结构化流检查点

ran*_*ddy 4 hadoop pyspark spark-structured-streaming

我正在尝试从 Kafka 进行结构化流式传输。我打算在 HDFS 中存储检查点。我读了一篇 Cloudera 博客,建议不要在 HDFS 中为 Spark 流存储检查点。结构流检查点是否存在相同的问题。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

在结构化流中,如果我的 spark 程序停机了一段时间,我如何从检查点目录中获取最新的偏移量并在该偏移量之后加载数据。我将检查点存储在一个目录中,如下所示。

 df.writeStream\
        .format("text")\
        .option("path", '\files') \
        .option("checkpointLocation", 'checkpoints\chkpt') \
        .start()
Run Code Online (Sandbox Code Playgroud)

更新:

这是我的结构化流程序读取 Kafka 消息,解压缩并写入 HDFS。

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KafkaServer) \
        .option("subscribe", KafkaTopics) \
        .option("failOnDataLoss", "false")\
         .load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()

decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream

query = decomp.writeStream\
    .format("text")\
    .option("path", \Data_directory_inHDFS) \
    .option("checkpointLocation", \pathinDHFS\) \
    .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

Nam*_*wal 5

在长期存储(HDFS、AWS S3 等)上存储检查点是最优选的。我想在这里补充一点,不应将属性“failOnDataLoss”设置为 false,因为这不是最佳实践。数据丢失是没有人愿意承受的。休息你在正确的道路上。