Abh*_*eet 5 input apache-kafka logstash
我正在尝试从多个主题过滤kafka事件,但是一旦一个主题的所有事件都被过滤,logstash就无法从另一个kafka主题获取事件。我正在使用具有3个分区和2个复制的主题,这是我的logstash配置文件
input {
kafka{
auto_offset_reset => "smallest"
consumer_id => "logstashConsumer1"
topic_id => "unprocessed_log1"
zk_connect=>"192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
type => "kafka_type_1"
}
kafka{
auto_offset_reset => "smallest"
consumer_id => "logstashConsumer1"
topic_id => "unprocessed_log2"
zk_connect => "192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
type => "kafka_type_2"
}
}
filter{
if [type] == "kafka_type_1"{
csv {
separator=>" "
source => "data"
}
}
if [type] == "kafka_type_2"{
csv {
separator => " "
source => "data"
}
}
}
output{
stdout{ codec=>rubydebug{metadata => true }}
}
Run Code Online (Sandbox Code Playgroud)
小智 6
这是一个很晚的回复,但是如果您想输入多个主题并输出到另一个 kafka 多个输出,您可以执行以下操作:
input {
kafka {
topics => ["topic1", "topic2"]
codec => "json"
bootstrap_servers => "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092"
decorate_events => true
group_id => "logstash-multi-topic-consumers"
consumer_threads => 5
}
}
output {
if [kafka][topic] == "topic1" {
kafka {
codec => "json"
topic_id => "new_topic1"
bootstrap_servers => "output-kafka-1:9092"
}
}
else if [kafka][topic] == "topic2" {
kafka {
codec => "json"
topic_id => "new_topic2"
bootstrap_servers => "output-kafka-1:9092"
}
}
}
Run Code Online (Sandbox Code Playgroud)
在详细说明您的引导服务器时要小心,并给出您的 kafka 广告侦听器的名称。
Ref-1:https : //www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-group_id
Ref-2:https : //www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events