具有多个 kafka 主题输入的 logstash

Op *_*ule 2 logstash

我想使用以下设置启动一个 logstash 实例:

input {
  kafka {
    topic_id => "topic_a"
    .......
  }
  kafka {
    topic_id => "topic_b"
    .......
  }

}
filter {
  json {
    source => "message"
  }
  uuid {
    target => "@uuid"
  }
  mutate {
    replace => { "message" => "%{message}" } # want to get the full json literal but does not work
    add_field => {
      "topic" => "%{topic_id}" # it does not work either
    }
  }

  # logic to apply different filter base on topic_id
  if [topic_id] =~ 'topic_a' { # this block seems never entered        
    mutate {
       replace => { "topic" => "topic_a" }
    }
  } else {
    .....
  }
}
output {
  .....
}
Run Code Online (Sandbox Code Playgroud)

我的 Kibana 上的输出应该如下所示:

topic : %{topic_id}
Run Code Online (Sandbox Code Playgroud)

提示上面的配置无法提取topic_id。我不知道如何配置过滤器部分。任何人都可以对此提供提示吗?谢谢。

顺便说一句,我正在使用 logstash-2.2.2

编辑:根据logstash文档更新配置,结果还是一样

小智 5

默认情况下,Kafka Input Plugin 不包含元数据信息,例如: topic_id ..
您必须启用decorate_events选项:

kafka {
    topic_id => "topic_a"
    decorate_events => true
  }
Run Code Online (Sandbox Code Playgroud)

完成后,您可以在kafkatopic键的数组中找到您的 topic_id 。

装饰事件

值类型为布尔值 默认值为 false 将 Kafka 元数据(如主题、消息大小)添加到事件的选项。这将向包含以下属性的 logstash 事件添加一个名为 kafka 的字段: msg_size:以字节为单位的此消息的完整序列化大小(包括 crc、标头属性等) topic:此消息与 consumer_group 关联的主题:消费者组用于在此事件分区中读取:此消息与密钥关联的分区:包含消息密钥的 ByteBuffer https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins -inputs-kafka-decorate_events