我们想为我们公司的几个不同项目设置logstash服务器.现在我尝试在Kibana中启用它们.我的问题是:如果我有不同的日志文件模式,我如何为它们构建过滤器?示例:logstash.conf:
input {
file {
type => "A"
path => "/home/logstash/A/*"
start_position => "beginning"
}
file {
type => "B"
path => "/home/logstash/B*"
start_position => "beginning"
}
}
filter {
multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
grok {
type => A
match => [ "message", "%{TIMESTAMP_ISO8601:logdate} %{DATA:thread %{LOGLEVEL:level}\s*%{DATA:logger_name}\s*-\s*%{GREEDYDATA:log_text}"]
add_tag => [ "level_%{level}" ]
}
date {
match => ["logdate", "YYYY-MM-dd HH:mm:ss,SSS"]
}
grok {
type => B
match => [ any other pattern ... …Run Code Online (Sandbox Code Playgroud) 在我的logstash日志中,我有时空行或只有空格的行.
要删除空行,我创建了一个dropemptyline过滤器文件
# drop empty lines
filter {
if [message] =~ /^\s*$/ {
drop { }
}
}
Run Code Online (Sandbox Code Playgroud)
但是空行过滤器没有按预期工作,主要是因为这个特定的过滤器在链中的其他过滤器之后有过滤器.
00_input.conf
05_syslogfilter.conf
06_dropemptylines.conf
07_classifier.conf
Run Code Online (Sandbox Code Playgroud)
所以我认为我的特定过滤器可以工作,如果它是唯一的,但不是.
2015-02-11 15:02:12.347 WARN 1 --- [tp1812226644-23] o.eclipse.jetty.servlet.ServletHandler :
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.springframework.dao.DataAccessResourceFailureException: Timed out after 10000 ms while waiting for a server that matches AnyServerSelector{}. Client view of cluster state is {type=Unknown, servers=[{address=mongo:27017, type=Unknown, state=Connecting, exception={com.mongodb.MongoException$Network: Exception opening the socket}, caused by {java.net.UnknownHostException: mongo: unknown error}}]; nested exception is com.mongodb.MongoTimeoutException: Timed out …Run Code Online (Sandbox Code Playgroud) 我有一个日志文件,其中数据由管道符号分隔."|".一个例子如下.有谁知道如何编写GROK模式来提取它为logstash?
2014-01-07 11:58:48.7694 | LOGLEVEL | LOGSOURCE | LOGMESSAGE
我在我的系统上设置了Elasticsearch,Logstash,Kibana日志查看工具.我的配置现在有2台机器(Amazon EC2实例):
在logstash-server上,这是我的配置(在不同的文件中)的样子: -
input {
lumberjack {
port => 5000
type => "logs"
ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
} …Run Code Online (Sandbox Code Playgroud) elasticsearch logstash kibana logstash-grok logstash-forwarder
我是ELK堆栈的新用户.我正在使用UWSGI作为我的服务器.我需要使用Grok解析我的uwsgi日志,然后分析它们.
这是我的日志格式: -
[pid: 7731|app: 0|req: 357299/357299] ClientIP () {26 vars in 511 bytes} [Sun Mar 1 07:47:32 2015] GET /?file_name=123&start=0&end=30&device_id=abcd&verif_id=xyzsghg => generated 28 bytes in 1 msecs (HTTP/1.0 200) 2 headers in 79 bytes (1 switches on core 0)
Run Code Online (Sandbox Code Playgroud)
我使用此链接生成我的过滤器,但它没有解析大部分信息.
上面链接生成的过滤器是
%{SYSLOG5424SD} %{IP} () {26 vars in 511 bytes} %{SYSLOG5424SD} GET %{URIPATHPARAM} => generated 28 bytes in 1 msecs (HTTP%{URIPATHPARAM} 200) 2 headers in 79 bytes (1 switches on core 0)
Run Code Online (Sandbox Code Playgroud)
这是我的logstash-conf文件.
input { stdin { } …Run Code Online (Sandbox Code Playgroud) 我一直在尝试使用logstash解析我的python traceback日志.我的日志看起来像这样:
[pid: 26422|app: 0|req: 73/73] 192.168.1.1 () {34 vars in 592 bytes} [Wed Feb 18 13:35:55 2015] GET /data => generated 2538923 bytes in 4078 msecs (HTTP/1.1 200) 2 headers in 85 bytes (1 switches on core 0)
Traceback (most recent call last):
File "/var/www/analytics/parser.py", line 257, in parselogfile
parselogline(basedir, lne)
File "/var/www/analytics/parser.py", line 157, in parselogline
pval = understandpost(parts[3])
File "/var/www/analytics/parser.py", line 98, in understandpost
val = json.loads(dct["events"])
File "/usr/lib/python2.7/json/__init__.py", line 338, in loads
return _default_decoder.decode(s)
File "/usr/lib/python2.7/json/decoder.py", …Run Code Online (Sandbox Code Playgroud) 我正在检查我们服务器上的nginx错误日志,发现它们的日期格式为:
2015/08/30 05:55:20
Run Code Online (Sandbox Code Playgroud)
即YYYY/MM/DD HH:mm:ss.我试图找到一个现有的grok日期模式,这可能有助于我快速解析,但遗憾的是找不到任何这样的日期格式.最终,我不得不把模式写成:
%{YEAR}/%{MONTHNUM}/%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
Run Code Online (Sandbox Code Playgroud)
我仍然希望是否有更短的模式?
我被要求将我们的log4j日志文件(暂时不使用Socket调用)整合到Logstash JSON文件中,然后我将其提供给Elasticsearch.我们的代码使用RollingFileAppender.这是一个示例日志条目.
2016-04-22 16:43:25,172 ERROR :SomeUser : 2 [com.mycompany.SomeClass] AttributeSchema 'Customer |Customer |Individual|Individual|Quarter|Date' : 17.203 The Log Message.
Run Code Online (Sandbox Code Playgroud)
这是我们的log4j.properties文件中的ConversionPattern值
<param name="ConversionPattern" value="%d{ISO8601} %p %x %X{username}:%t [%c] %m %n" />
Run Code Online (Sandbox Code Playgroud)
有人可以帮我写一个解析该行的Logstash Grok过滤器吗?到目前为止,我有以下内容
filter {
if [type] == "log4j" {
grok {
match => ["message", "%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:messsage}"]
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss,SSS", "ISO8601"]
}
}
}
Run Code Online (Sandbox Code Playgroud)
但当然,它将优先级之后的所有内容作为消息.我想进一步隔离以下字段(在Log4j模式布局中定义)
我一直在关注本指南:
http://deviantony.wordpress.com/2014/06/04/logstash-debug-configuration/
我希望能帮助我测试我的logstash过滤器,看看我是否在全职使用之前得到了所需的输出.
作为指南的一部分,它告诉您设置输入和输出,然后设置过滤器文件.输入似乎工作正常:
input {
stdin { }
}
Run Code Online (Sandbox Code Playgroud)
输出是这样的:
output {
stdout {
codec => json
}
file {
codec => json
path => /tmp/debug-filters.json
}
}
Run Code Online (Sandbox Code Playgroud)
当我尝试运行logstash进程时,我收到以下错误(这里我用--configtest运行它,因为错误建议我尝试一下,但它不提供任何更多信息):
# /opt/logstash/bin/logstash -f /etc/logstash/debug.d -l /var/log/logstash/logstash-debug.log --configtest
Sending logstash logs to /var/log/logstash/logstash-debug.log.
Error: Expected one of #, ", ', -, [, { at line 21, column 17 (byte 288) after output {
stdout {
codec => json
}
file {
codec => json
path =>
Run Code Online (Sandbox Code Playgroud)
我已经尝试删除输出中的文件部分,我可以运行logstash进程,但是当我将日志行粘贴到shell时,我没有看到日志条目分解为我希望grok过滤器的组件打破它.我这样做的全部是:
Oct 30 08:57:01 VERBOSE[1447] …Run Code Online (Sandbox Code Playgroud) 目前我在我的logstash配置文件中做了类似的事情:
filter {
...
mutate {
...
convert => {
"blahId" => "integer"
"blahblahId" => "integer"
...
...
"b...blahId" => "integer"
}
...
}
...
}
Run Code Online (Sandbox Code Playgroud)
所以基本上我想将所有以"Id"结尾的字段转换为整数.有没有办法在一行中做到这一点?这样的事情"*Id" => "integer"是什么?
编辑:我试过了
convert => {
"*Id" => "integer"
}
Run Code Online (Sandbox Code Playgroud)
正如我所料,没有奏效.
也许使用红宝石过滤器?