add*_*ell 4 java scala apache-kafka apache-spark kafka-consumer-api
我有一个最早的活动要阅读的Kafka主题。
我要做的是从某个主题(从时间上绝对最早的事件)到某个日期的事件获取所有数据。
每个事件的结构都有一个称为的字段dateCliente,我将其用作过滤事件的阈值。到目前为止,我已经成功完成了读写操作。我正在写一个临时拼花文件,用作Hive表的分区。这样做很正常,但是,即使我在auto.offset.reset参数中指定的最早,也不会从头开始读取数据。
每当我运行代码时,我都会从该日期开始获取所有事件。每次我再次执行该代码时,它都会在我上次执行代码中读到的最后一个事件之后继续从Kafka事件中读取数据。
我用于配置Kafka Consumer和订阅主题的代码如下:
// Configurations for kafka consumer
val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
//
// Crea la sesion de Spark
val spark = SparkSession
.builder()
.master("yarn")
.appName("XY")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "XY")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("geoevents") )
Run Code Online (Sandbox Code Playgroud)
但是,每当我从命令行创建使用者以读取主题中的数据时,我都会获得前几天的所有事件。我运行的命令行命令是:
kafka-console-consumer --new-consumer --topic geoevents --from-beginning --bootstrap-server xx.yy.zz.xx
Run Code Online (Sandbox Code Playgroud)
为什么任何想法是我的代码行为就像是和无视我"earliest"的auto.offset.reset?
这是因为auto.offset.reset仅在该组没有提交的偏移量时才应用。
请参阅使用者配置文件:
当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办
如果要从头开始,则可以:
使用新的群组名称(例如追加System.currentTimeMillis()到群组anme)
使用seekToBeginning()以下命令将使用者的位置明确移动到分区的开头:http : //kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning-java.util.Collection --
| 归档时间: |
|
| 查看次数: |
2794 次 |
| 最近记录: |