Raf*_*ima 5 elasticsearch logstash kibana-4
我正在使用logstash输入jdbc插件来读取两个(或更多)数据库并将数据发送到elasticsearch,并使用kibana 4来虚拟化这些数据.
这是我的logstash配置:
input {
jdbc {
type => "A"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
statement => "SELECT id, date, content, status from test_table"
}
jdbc {
type => "B"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
statement => "SELECT id, date, content, status from test_table"
}
}
filter {
}
output {
if [type] == "A" {
elasticsearch {
host => "localhost"
protocol => http
index => "logstash-servera-%{+YYYY.MM.dd}"
}
}
if [type] == "B" {
elasticsearch {
host => "localhost"
protocol => http
index => "logstash-serverb-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
Run Code Online (Sandbox Code Playgroud)
问题是每次运行logstash时,它都会开始保存已经在弹性搜索中的所有数据.
在使用where子句= date>'2015-09-10'运行后,我停止了logstash并使用'special parameter'再次运行(带--debug):sql_last_date.logstash启动后它开始在日志中显示:
?[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2)
AS 'DDD',\nCASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO' \n W
HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t ELSE 'ERRO'\n END AS 'TIPO_MENSAGEM
',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC
_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL
ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql
_last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09-
10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jrub
y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line=
>"107", :method=>"execute_statement"}?[0m
Run Code Online (Sandbox Code Playgroud)
这次我使用的是'真实'语句:
SELECT
SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD',
CASE WHEN R.STATUS <> 'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'
WHEN R.STATUS = 'RCON' THEN 'SUCESSO'
ELSE 'ERRO'
END AS 'TIPO_MENSAGEM',
AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA
FROM RECARGA R (NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO
where R.DT_RECARGA > :sql_last_start
ORDER BY R.DT_RECARGA ASC
Run Code Online (Sandbox Code Playgroud)
谁知道怎么解决?
谢谢!
sql_last_start现在sql_last_value请在此处查看
特殊参数sql_last_start现在已重命名sql_last_value 为更清晰,因为它不仅限于日期时间,还可能具有其他列类型.所以现在解决方案可能是这样的
input {
jdbc {
type => "A"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch- jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
use_column_value => true
tracking_column => date
statement => "SELECT id, date, content, status from test_table WHERE date >:sql_last_value"
#clean_run true means it will reset sql_last_value to zero or initial value if datatype is date(default is also false)
clean_run =>false
}
jdbc{
#for type B....
}
}
Run Code Online (Sandbox Code Playgroud)
我已经使用sql Server DB进行了测试
请运行第一次使用clean_run => ture以避免数据类型错误,而在开发中我们可能有不同的数据类型值存储在sql_last_value变量中
默认情况下,jdbc输入将执行配置的 SQL 语句。就您而言,您的语句选择 中的所有内容test_table。您需要指示您的 SQL 语句仅通过在 SQL 查询中jdbc使用预定义参数来加载上次运行输入时的数据。sql_last_start
input {
jdbc {
type => "A"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
statement => "SELECT id, date, content, status from test_table WHERE date > :sql_last_start"
}
jdbc {
type => "B"
jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
jdbc_user => "user"
jdbc_password => "pass"
schedule => "5 * * * *"
statement => "SELECT id, date, content, status from test_table WHERE date > :sql_last_start"
}
}
Run Code Online (Sandbox Code Playgroud)
此外,如果碰巧同一条记录从数据库加载两次,并且您不希望在 ES 服务器中创建重复项,您还可以指定使用记录 ID 作为输出中的文档 ID elasticsearch,这样文档将在 ES 中更新且不会重复。
output {
if [type] == "A" {
elasticsearch {
host => "localhost"
protocol => http
index => "logstash-servera-%{+YYYY.MM.dd}"
document_id => "%{id}" <--- same id as in DB
}
}
if [type] == "B" {
elasticsearch {
host => "localhost"
protocol => http
index => "logstash-serverb-%{+YYYY.MM.dd}"
document_id => "%{id}" <--- same id as in DB
}
}
stdout { codec => rubydebug }
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8034 次 |
| 最近记录: |