Logstash输入jdbc是重复的结果

Raf*_*ima 5 elasticsearch logstash kibana-4

我正在使用logstash输入jdbc插件来读取两个(或更多)数据库并将数据发送到elasticsearch,并使用kibana 4来虚拟化这些数据.

这是我的logstash配置:

input {
  jdbc {
    type => "A"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table"
  }

jdbc {
    type => "B"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table"
  }
}
filter {

}
output {

    if [type] == "A" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-servera-%{+YYYY.MM.dd}"
        }    
    }
    if [type] == "B" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-serverb-%{+YYYY.MM.dd}"
        }    
    }

  stdout { codec => rubydebug }
}
Run Code Online (Sandbox Code Playgroud)

问题是每次运行logstash时,它都会开始保存已经在弹性搜索中的所有数据.

在使用where子句= date>'2015-09-10'运行后,我停止了logstash并使用'special parameter'再次运行(带--debug):sql_last_date.logstash启动后它开始在日志中显示:

?[36mExecuting JDBC query {:statement=>"SELECT \n\tSUBSTRING(R.RECEBEDOR, 1, 2)
AS 'DDD',\nCASE WHEN R.STATUS <>  'RCON' AND R.COD_RESPOSTA in (428,429,230,425,
430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'  \n       W
HEN R.STATUS = 'RCON' THEN 'SUCESSO'\n\t   ELSE 'ERRO'\n   END AS 'TIPO_MENSAGEM
',\nAP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC
_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APL
ICACAO, RECEBEDOR, R.ID_OPERADORA, R.TIPO_PRODUTO \n\nFROM RECARGA R (NOLOCK)\nJ
OIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO \nwhere R.DT_RECARGA > :sql
_last_start\nORDER BY R.DT_RECARGA ASC", :parameters=>{:sql_last_start=>2015-09-
10 18:48:00 UTC}, :level=>:debug, :file=>"/DEV/logstash-1.5.4/vendor/bundle/jrub
y/1.9/gems/logstash-input-jdbc-1.0.0/lib/logstash/plugin_mixins/jdbc.rb", :line=
>"107", :method=>"execute_statement"}?[0m
Run Code Online (Sandbox Code Playgroud)

这次我使用的是'真实'语句:

SELECT 
    SUBSTRING(R.RECEBEDOR, 1, 2) AS 'DDD',
CASE WHEN R.STATUS <>  'RCON' AND R.COD_RESPOSTA in (428,429,230,425,430,427,418,422,415,424,214,433,435,207,426) THEN 'REGRA DE NEGÓCIO'  
       WHEN R.STATUS = 'RCON' THEN 'SUCESSO'
       ELSE 'ERRO'
   END AS 'TIPO_MENSAGEM',
AP.ALIAS as 'CANAL', R.ID_RECARGA, R.VALOR, R.STATUS, R.COD_RESPOSTA, R.DESC_RESPOSTA, R.DT_RECARGA as '@timestamp', R.ID_CLIENTE, R.ID_DEPENDENTE, R.ID_APLICACAO, RECEBEDOR, R.ID_OPERADORA

FROM RECARGA R (NOLOCK)
JOIN APLICACAO AP ON R.ID_APLICACAO = AP.ID_APLICACAO 
where R.DT_RECARGA > :sql_last_start
ORDER BY R.DT_RECARGA ASC
Run Code Online (Sandbox Code Playgroud)

谁知道怎么解决?

谢谢!

Rag*_*dra 6

sql_last_start现在sql_last_value请在此处查看 特殊参数sql_last_start现在已重命名sql_last_value 为更清晰,因为它不仅限于日期时间,还可能具有其他列类型.所以现在解决方案可能是这样的

input {
jdbc {
     type => "A"
     jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-  jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
     jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
     jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
     jdbc_user => "user"
     jdbc_password => "pass"
     schedule => "5 * * * *"
     use_column_value => true
     tracking_column => date
     statement => "SELECT id, date, content, status from test_table WHERE date >:sql_last_value"
    #clean_run true means it will reset sql_last_value to zero or initial value if datatype is date(default is also false)
     clean_run =>false
   }
jdbc{
  #for type B....
  }
}
Run Code Online (Sandbox Code Playgroud)

我已经使用sql Server DB进行了测试

请运行第一次使用clean_run => ture以避免数据类型错误,而在开发中我们可能有不同的数据类型值存储在sql_last_value变量中


Val*_*Val 4

默认情况下,jdbc输入将执行配置的 SQL 语句。就您而言,您的语句选择 中的所有内容test_table。您需要指示您的 SQL 语句仅通过在 SQL 查询中jdbc使用预定义参数来加载上次运行输入时的数据。sql_last_start

input {
  jdbc {
    type => "A"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp1;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table WHERE date > :sql_last_start"
  }

jdbc {
    type => "B"
    jdbc_driver_library => "C:\DEV\elasticsearch-1.7.1\plugins\elasticsearch-jdbc-1.7.1.0\lib\jtds-1.3.1.jar"
    jdbc_driver_class => "Java::net.sourceforge.jtds.jdbc.Driver"
    jdbc_connection_string => "jdbc:jtds:sqlserver://dev_data_base_server:1433/dbApp2;domain=CORPDOMAIN;useNTLMv2=true"
    jdbc_user => "user"
    jdbc_password => "pass"
    schedule => "5 * * * *"
    statement => "SELECT id, date, content, status from test_table WHERE date > :sql_last_start"
  }
}
Run Code Online (Sandbox Code Playgroud)

此外,如果碰巧同一条记录从数据库加载两次,并且您不希望在 ES 服务器中创建重复项,您还可以指定使用记录 ID 作为输出中的文档 ID elasticsearch,这样文档将在 ES 中更新且不会重复。

output {

    if [type] == "A" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-servera-%{+YYYY.MM.dd}"
            document_id => "%{id}"       <--- same id as in DB
        }    
    }
    if [type] == "B" {
        elasticsearch {
            host => "localhost"
            protocol => http
            index => "logstash-serverb-%{+YYYY.MM.dd}"
            document_id => "%{id}"       <--- same id as in DB
        }    
    }

  stdout { codec => rubydebug }
}
Run Code Online (Sandbox Code Playgroud)