kafka只消费新消息

Tar*_*arg 1 apache-kafka apache-spark spark-streaming kafka-consumer-api

我的 Spark Streaming 工作正在消耗来自 Kafka 的数据

KafkaUtils.createStream(jssc, prop.getProperty(Config.ZOOKEEPER_QUORUM),
                        prop.getProperty(Config.KAFKA_CONSUMER_GROUP), topicMap);
Run Code Online (Sandbox Code Playgroud)

每当我重新启动我的工作时,它就会开始从最后一个偏移存储开始消耗(我假设这是因为发送处理后的数据需要花费大量时间,并且如果我更改消费者组,它会立即处理新消息)

我是 kafka 8.1.1,其中 auto.offset.reset 默认为最大,这意味着每当我重新启动 kafka 都会从我离开的位置发送数据。

我的用例要求我忽略这些数据并仅处理到达的数据。我怎样才能做到这一点?任何建议

小智 5

有两种方法可以实现此目的:

  1. 每次重新启动时创建一个唯一的消费者组,它将从最新的偏移量开始消费。

  2. 使用直接方法而不是基于接收器的方法;在这里你可以更好地控制你的消费方式,但必须手动更新zookeeper来存储你的偏移量。在下面的示例中,它将始终从最近的偏移量开始。

    import org.apache.spark.streaming.kafka._
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
    
    Run Code Online (Sandbox Code Playgroud)

有关直接方法的文档在这里: https: //spark.apache.org/docs/latest/streaming-kafka-integration.html