如何保存Spark消耗给ZK或Kafka的最新偏移量,并在重启后可以回读

gia*_*dau 14 apache-kafka apache-spark spark-streaming kafka-consumer-api

Kafka 0.8.2用来从AdExchange接收数据然后我Spark Streaming 1.4.1用来存储数据MongoDB.

我的问题是当我重新启动我的Spark StreamingJob时,例如更新新版本,修复bug,添加新功能.它将继续阅读最新offsetkafka重启作业期间在当时那么我将数据丢失的AdX推卡夫卡.

我尝试类似的东西,auto.offset.reset -> smallest但它会从0 - >收到最后数据是巨大的,并在数据库中重复.

我也尝试设置具体的group.idconsumer.idSpark却是相同的.

如何保存最新的offset消耗,火花zookeeperkafka然后可以从读回最新的offset

Mic*_*iov 15

createDirectStream函数的构造函数之一可以获取一个映射,该映射将分区id作为键,并将您开始使用的偏移量作为值.

请看这里的api:http ://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html我所谈论的地图通常称为:fromOffsets

您可以将数据插入地图:

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)
Run Code Online (Sandbox Code Playgroud)

并在创建直接流时使用它:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))
Run Code Online (Sandbox Code Playgroud)

每次迭代后,您可以使用以下方法获取已处理的偏移:

rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Run Code Online (Sandbox Code Playgroud)

您将能够使用此数据在下一次迭代中构造fromOffsets映射.

您可以在此处查看完整的代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html页面末尾

  • 任何可靠的存储应该做的工作.我通常将数据保存到HDFS,因为我认为这是最简单的解决方案.我想不出Redis无法完成这项工作的原因. (3认同)
  • 其中一个选项是我告诉你使用.offsetRanges数据结构.在给定的迭代中处理流后,您可以执行以下操作:`dStream.foreachRDD {rdd => val x = rdd.asInstanceOf [HasOffsetRanges] .offsetRanges; //用X做一些事情(比如保存外部FS)}`x将保存RDD的每个主题分区组合的最后一个处理过的偏移量.如果你需要一次只有语义,你必须手动支持它,但它是可能的. (2认同)
  • 我认为我不想保存在外部存储器中,因为ZK和Kafka可以处理这个问题. (2认同)