Logstash索引JSON数组

JP.*_*JP. 13 json logstash

Logstash太棒了.我可以像这样发送JSON(多行可读性):

{
  "a": "one"
  "b": {
    "alpha":"awesome"
  }
}
Run Code Online (Sandbox Code Playgroud)

然后使用搜索词在kibana中查询该行b.alpha:awesome.尼斯.

但是我现在有一个像这样的JSON日志行:

{
  "different":[
    {
      "this": "one",
      "that": "uno"
    },
    {
      "this": "two"
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

我希望能够找到像different.this:two (或different.this:one,或different.that:uno)这样的搜索行

如果我直接使用Lucene,我会遍历different数组,并为其中的每个哈希生成一个新的搜索索引,但Logstash目前似乎像这样摄取该行:

不同:{this:one,that:uno},{this:two}

这不会帮助我使用different.this或搜索日志行different.that.

我是否有任何关于编解码器,过滤器或代码更改的想法,我可以做到这一点?

vza*_*llo 3

您可以编写自己的过滤器(复制粘贴、重命名类名、config_name重写filter(event)方法)或修改当前的JSON过滤器( Github 上的源代码

logstash-1.x.x\lib\logstash\filters您可以在以下名为 的路径中找到 JSON 过滤器(Ruby 类)源代码json.rb。JSON过滤器将内容解析为JSON如下

begin
  # TODO(sissel): Note, this will not successfully handle json lists
  # like your text is '[ 1,2,3 ]' JSON.parse gives you an array (correctly)
  # which won't merge into a hash. If someone needs this, we can fix it
  # later.
  dest.merge!(JSON.parse(source))

  # If no target, we target the root of the event object. This can allow
  # you to overwrite @timestamp. If so, let's parse it as a timestamp!
  if !@target && event[TIMESTAMP].is_a?(String)
    # This is a hack to help folks who are mucking with @timestamp during
    # their json filter. You aren't supposed to do anything with
    # "@timestamp" outside of the date filter, but nobody listens... ;)
    event[TIMESTAMP] = Time.parse(event[TIMESTAMP]).utc
  end

  filter_matched(event)
rescue => e
  event.tag("_jsonparsefailure")
  @logger.warn("Trouble parsing json", :source => @source,
               :raw => event[@source], :exception => e)
  return
end
Run Code Online (Sandbox Code Playgroud)

可以修改解析过程来修改原来的JSON

  json  = JSON.parse(source)
  if json.is_a?(Hash)
    json.each do |key, value| 
        if value.is_a?(Array)
            value.each_with_index do |object, index|
                #modify as you need
                object["index"]=index
            end
        end
    end
  end
  #save modified json
  ......
  dest.merge!(json)
Run Code Online (Sandbox Code Playgroud)

然后你可以修改你的配置文件以使用/你的新的/修改的 JSON 过滤器并放置在\logstash-1.x.x\lib\logstash\config

这是我的elastic_with_json.conf,经过修改的json.rb过滤器

input{
    stdin{

    }
}filter{
    json{
        source => "message"
    }
}output{
    elasticsearch{
        host=>localhost
    }stdout{

    }
}
Run Code Online (Sandbox Code Playgroud)

如果您想使用新的过滤器,您可以使用以下命令对其进行配置config_name

class LogStash::Filters::Json_index < LogStash::Filters::Base

  config_name "json_index"
  milestone 2
  ....
end
Run Code Online (Sandbox Code Playgroud)

并配置它

input{
    stdin{

    }
}filter{
    json_index{
        source => "message"
    }
}output{
    elasticsearch{
        host=>localhost
    }stdout{

    }
}
Run Code Online (Sandbox Code Playgroud)

希望这可以帮助。