Logstash:将两个日志合并为一个输出文档

Loi*_*ros 6 elasticsearch logstash

我已将syslog设置为使用以下过滤器将日志发送到logstash:

output {
  elasticsearch 
  { hosts => ["localhost:9200"]
  document_id => "%{job_id}"   
}

}
filter {
    grok {
        overwrite => ["message"]
    }
    json {
     source => "message"
    }
}
Run Code Online (Sandbox Code Playgroud)

我的一个应用程序的典型消息将具有初始状态和job_id:

{"job_id": "xyz782", state: "processing", job_type: "something"}
Run Code Online (Sandbox Code Playgroud)

大约几分钟后,另一个日志将具有相同的log_id,不同的状态和处理时间:

{"job_id": "xyz782", state:"failed", processing_time: 12.345}
Run Code Online (Sandbox Code Playgroud)

正确加载这些字段,但会创建两个文档.我希望只为初始日志创建一个文档,而第二个日志则更新第一个文档,这意味着更新后的文档将包含以下字段:

{"job_id": "xyz782", state: "failed", job_type: "something", processing_time: 12.345}
Run Code Online (Sandbox Code Playgroud)

正如您在我的logstash conf输出中所看到的,我使用job_id作为文档ID,但是,第二条消息似乎替换了第一条消息中的字段,但也删除了第一条消息中不在的第一条消息中的所有字段.例如,第二个消息中出现的job_type字段没有出现在最终文档中.这可能与json两次来自相同字段"消息"的事实有关.是否有另一种方法可以将两个日志消息合并到logstash中的一个文档中?

Val*_*Val 5

您可以使用aggregate过滤器来执行此操作.聚合过滤器支持基于公共字段值将多个日志行聚合到一个单个事件中.在您的情况下,公共字段将是job_id字段.

然后我们需要另一个字段来检测第一个事件和第二个应该聚合的事件.在你的情况下,这将是state领域.

所以你只需要在现有的Logstash配置中添加另一个过滤器,如下所示:

filter {
    ...your other filters

    if [state] == "processing" {
        aggregate {
            task_id => "%{job_id}"
        }
    } else if [state] == "failed" {
        aggregate {
            task_id => "%{job_id}"
            end_of_task => true
            timeout => 120
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

您可以timeout根据作业的运行时间自由调整(以秒为单位).