具有多个Kafka输入的Logstash

Abh*_*eet 5 input apache-kafka logstash

我正在尝试从多个主题过滤kafka事件,但是一旦一个主题的所有事件都被过滤,logstash就无法从另一个kafka主题获取事件。我正在使用具有3个分区和2个复制的主题,这是我的logstash配置文件

input {
    kafka{              
        auto_offset_reset => "smallest"
        consumer_id => "logstashConsumer1"          
        topic_id => "unprocessed_log1"
        zk_connect=>"192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
        type => "kafka_type_1"
}
kafka{              
    auto_offset_reset => "smallest"
    consumer_id => "logstashConsumer1"          
    topic_id => "unprocessed_log2"
    zk_connect => "192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
    type => "kafka_type_2"
}
}
filter{
    if [type] == "kafka_type_1"{
    csv { 
        separator=>" "
        source => "data"        
    }   
}
if [type] == "kafka_type_2"{    
    csv { 
        separator => " "        
        source => "data"
    }
}
}
output{
    stdout{ codec=>rubydebug{metadata => true }}
}
Run Code Online (Sandbox Code Playgroud)

小智 6

这是一个很晚的回复,但是如果您想输入多个主题并输出到另一个 kafka 多个输出,您可以执行以下操作:

input {
  kafka {
    topics => ["topic1", "topic2"]
    codec => "json"
    bootstrap_servers => "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092"
    decorate_events => true
    group_id => "logstash-multi-topic-consumers"
    consumer_threads => 5
  }
}
    
output {
   if [kafka][topic] == "topic1" {
     kafka {
       codec => "json"
       topic_id => "new_topic1"
       bootstrap_servers => "output-kafka-1:9092"
     }
   }
   else if [kafka][topic] == "topic2" {
      kafka {
       codec => "json"
       topic_id => "new_topic2"
       bootstrap_servers => "output-kafka-1:9092"
      }
    }
}
Run Code Online (Sandbox Code Playgroud)

在详细说明您的引导服务器时要小心,并给出您的 kafka 广告侦听器的名称。

Ref-1:https : //www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-group_id

Ref-2:https : //www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events

  • 回答我自己的问题,查看 [源代码](https://sourcegraph.com/github.com/logstash-plugins/logstash-input-kafka@master/-/blob/lib/logstash/inputs/kafka.rb# L249),看起来每个线程都会从两个主题中读取 (2认同)