Logstash MySQL组合了多行

Kar*_*k H 1 elasticsearch logstash

我一直在使用logstash,并且能够将我的一些MySQL表插入到Elastic搜索中.

现在,我需要将MySQL Query statememt中的多行组合成一个文档.例如,假设我有一个mySQL查询,我有以下结果,

1   Group1  Company1    User1
1   Group1  Company1    User2
1   Group2  Company2    User3
1   Group2  Company2    User4
Run Code Online (Sandbox Code Playgroud)

我想在Elastic Search for Group1和Group2中创建2个Group文档.在Group1中,我需要添加User1和User2.在Group2中,我需要添加User3和User4.

这可能使用LogStash并结合一些过滤器吗?如果是这样,任何人都可以指出我正确的方向.

小智 6

您可以使用logstash-filter-aggregate.task_id在您的情况下,过滤器采用组字段,并聚合具有相同task_id值的行.因此,将聚合具有group1的事件,然后当筛选器检测到新的组值时,group1的聚合用户数据将作为单个Logstash事件推送,以输出到您的elasticsearch文档.然后,它使用group2启动该过程.

确保logstash过滤器工作程序设置为1(-w 1标记或在配置文件中),并且行的排序方式是所有group1事件都进入,然后是所有group2等.否则事件可能会不按顺序处理会有意想不到的结果/数据丢失.

filter {
  aggregate {
    task_id => "%{group}"
    code => "
      map['users'] ||= []
      map['users'].push(event.get('user'))
    "
    push_previous_map_as_event => true
    timeout_tags => ['aggregated']
  }

  if "aggregated" not in [tags] {
    drop {}
  }
}
Run Code Online (Sandbox Code Playgroud)

结果将如下所示:

"_source": {
  "group": "group1",
  "users": ["user1", "user2"]
}
Run Code Online (Sandbox Code Playgroud)