Tar*_*arg 1 apache-kafka apache-spark spark-streaming kafka-consumer-api
我的 Spark Streaming 工作正在消耗来自 Kafka 的数据
KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);
Run Code Online (Sandbox Code Playgroud)
每当我重新启动我的工作时,它就会开始从最后一个偏移存储开始消耗(我假设这是因为发送处理后的数据需要花费大量时间,并且如果我更改消费者组,它会立即处理新消息)
我是 kafka 8.1.1,其中 auto.offset.reset 默认为最大,这意味着每当我重新启动 kafka 都会从我离开的位置发送数据。
我的用例要求我忽略这些数据并仅处理到达的数据。我怎样才能做到这一点?任何建议
小智 5
有两种方法可以实现此目的:
每次重新启动时创建一个唯一的消费者组,它将从最新的偏移量开始消费。
使用直接方法而不是基于接收器的方法;在这里你可以更好地控制你的消费方式,但必须手动更新zookeeper来存储你的偏移量。在下面的示例中,它将始终从最近的偏移量开始。
import org.apache.spark.streaming.kafka._
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
Run Code Online (Sandbox Code Playgroud)有关直接方法的文档在这里: https: //spark.apache.org/docs/latest/streaming-kafka-integration.html
| 归档时间: |
|
| 查看次数: |
3385 次 |
| 最近记录: |