Logstash 5.1.1 kafka输入不会获取有关主题的现有消息

Fiz*_*izi 2 elasticsearch apache-kafka logstash apache-zookeeper

我有以下logstash配置与kafka输入

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["mytopic"]
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
    doc_as_upsert => true
    action => "update"
  }
}
Run Code Online (Sandbox Code Playgroud)

我面临的问题是,当我运行logstash时,它不会收到有关该主题的旧消息.我的印象是,第一次运行logstash时,它会获取有关尚未消耗的主题的所有消息.我检查过这是一个新主题,并且在其中有消息,当它开始运行时没有被logstash接收.它确实会在主题运行时收到有关主题的消息,但不包括在其开始之前存在的消息.我在配置中遗漏了什么,或者它是输入本身的怪癖.消息的保证对我的业务需求至关重要.

oh5*_*h54 7

由于您尚未为kafka指定组ID,因此需要考虑的因素如下:

  • kafka group.id(logstash kafka配置中的group_id)设置为logstash的默认值,即"logstash"
  • logstash中enable.auto.commit(enable_auto_commit)的默认Kafka值为"true"
  • Kafka auto.offset.reset(auto_offset_reset)在logstash中没有默认值,因此我假设使用了Kafka默认值latest.

因此,当您针对某个主题运行使用者并且无法获取该主题中已有的消息时,可能会发生以下两种情况之一:

  1. 没有与消费者具有相同组ID的现有组,因此使用最新的Kafka默认auto.offset.reset值,并且消费者将忽略现有消息.
  2. 存在具有相同组ID的现有组("logstash"),并且具有该组ID的一些消费者已经消耗了现有消息并且提交了偏移量(这个其他消费者可能是您之前或其他一些消费者所使用的消费者.同组ID).这意味着该组下的其他消费者不会重新消费这些消息,除非明确告知这样做.

所以你可能想要做的就是设置一些Kafka配置,对于你应该能够设置的logstash

group_id =>"some_random_group"

auto_offset_reset =>"最早"

如果您现在运行使用者,因为some_random_group没有现有的偏移量并且重置是最早的,所以使用者应该使用主题中的所有现有消息并提交偏移量.这意味着如果在消耗了所有消息后再次运行消费者,它将不会消耗现有消息.