我想使用以下设置启动一个 logstash 实例:
input {
kafka {
topic_id => "topic_a"
.......
}
kafka {
topic_id => "topic_b"
.......
}
}
filter {
json {
source => "message"
}
uuid {
target => "@uuid"
}
mutate {
replace => { "message" => "%{message}" } # want to get the full json literal but does not work
add_field => {
"topic" => "%{topic_id}" # it does not work either
}
}
# logic to apply different filter base on topic_id
if [topic_id] =~ 'topic_a' { # this block seems never entered
mutate {
replace => { "topic" => "topic_a" }
}
} else {
.....
}
}
output {
.....
}
Run Code Online (Sandbox Code Playgroud)
我的 Kibana 上的输出应该如下所示:
topic : %{topic_id}
Run Code Online (Sandbox Code Playgroud)
提示上面的配置无法提取topic_id。我不知道如何配置过滤器部分。任何人都可以对此提供提示吗?谢谢。
顺便说一句,我正在使用 logstash-2.2.2
编辑:根据logstash文档更新配置,结果还是一样
小智 5
默认情况下,Kafka Input Plugin 不包含元数据信息,例如: topic_id ..
您必须启用decorate_events选项:
kafka {
topic_id => "topic_a"
decorate_events => true
}
Run Code Online (Sandbox Code Playgroud)
完成后,您可以在kafka带topic键的数组中找到您的 topic_id 。
装饰事件
值类型为布尔值 默认值为 false 将 Kafka 元数据(如主题、消息大小)添加到事件的选项。这将向包含以下属性的 logstash 事件添加一个名为 kafka 的字段: msg_size:以字节为单位的此消息的完整序列化大小(包括 crc、标头属性等) topic:此消息与 consumer_group 关联的主题:消费者组用于在此事件分区中读取:此消息与密钥关联的分区:包含消息密钥的 ByteBuffer https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins -inputs-kafka-decorate_events
| 归档时间: |
|
| 查看次数: |
3644 次 |
| 最近记录: |