Fiz*_*izi 2 elasticsearch apache-kafka logstash apache-zookeeper
我有以下logstash配置与kafka输入
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["mytopic"]
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["localhost:9200"]
index => "my_index"
codec => "json"
document_id => "%{id}"
doc_as_upsert => true
action => "update"
}
}
Run Code Online (Sandbox Code Playgroud)
我面临的问题是,当我运行logstash时,它不会收到有关该主题的旧消息.我的印象是,第一次运行logstash时,它会获取有关尚未消耗的主题的所有消息.我检查过这是一个新主题,并且在其中有消息,当它开始运行时没有被logstash接收.它确实会在主题运行时收到有关主题的消息,但不包括在其开始之前存在的消息.我在配置中遗漏了什么,或者它是输入本身的怪癖.消息的保证对我的业务需求至关重要.
由于您尚未为kafka指定组ID,因此需要考虑的因素如下:
因此,当您针对某个主题运行使用者并且无法获取该主题中已有的消息时,可能会发生以下两种情况之一:
所以你可能想要做的就是设置一些Kafka配置,对于你应该能够设置的logstash
group_id =>"some_random_group"
auto_offset_reset =>"最早"
如果您现在运行使用者,因为some_random_group没有现有的偏移量并且重置是最早的,所以使用者应该使用主题中的所有现有消息并提交偏移量.这意味着如果在消耗了所有消息后再次运行消费者,它将不会消耗现有消息.
| 归档时间: |
|
| 查看次数: |
2101 次 |
| 最近记录: |