gia*_*dau 14 apache-kafka apache-spark spark-streaming kafka-consumer-api
我Kafka 0.8.2用来从AdExchange接收数据然后我Spark Streaming 1.4.1用来存储数据MongoDB.
我的问题是当我重新启动我的Spark StreamingJob时,例如更新新版本,修复bug,添加新功能.它将继续阅读最新offset的kafka重启作业期间在当时那么我将数据丢失的AdX推卡夫卡.
我尝试类似的东西,auto.offset.reset -> smallest但它会从0 - >收到最后数据是巨大的,并在数据库中重复.
我也尝试设置具体的group.id和consumer.id以Spark却是相同的.
如何保存最新的offset消耗,火花zookeeper或kafka然后可以从读回最新的offset?
Mic*_*iov 15
createDirectStream函数的构造函数之一可以获取一个映射,该映射将分区id作为键,并将您开始使用的偏移量作为值.
请看这里的api:http ://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html我所谈论的地图通常称为:fromOffsets
您可以将数据插入地图:
startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)
Run Code Online (Sandbox Code Playgroud)
并在创建直接流时使用它:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))
Run Code Online (Sandbox Code Playgroud)
每次迭代后,您可以使用以下方法获取已处理的偏移:
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Run Code Online (Sandbox Code Playgroud)
您将能够使用此数据在下一次迭代中构造fromOffsets映射.
您可以在此处查看完整的代码和用法:https://spark.apache.org/docs/latest/streaming-kafka-integration.html页面末尾
| 归档时间: |
|
| 查看次数: |
14714 次 |
| 最近记录: |