logstach:jdbc_page_size不会将我的所有数据转储到弹性搜索

wos*_*tom 6 database postgresql dump elasticsearch logstash

我想将tom_test2 postgresql表导出到弹性搜索.该表有176805行:

=> select count(*) from tom_test2;
 count  
--------
 176805
(1 row)
Run Code Online (Sandbox Code Playgroud)

以下logstach conf文件将我的数据正确导入弹性搜索:

input {
    jdbc {
        # Postgres jdbc connection string to our database, mydb
        jdbc_connection_string => "xxx"
        # The user we wish to execute our statement as
        jdbc_user => "xxx"
        jdbc_password => "xxx"
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "xxx"
        # The name of the driver class for Postgresql
        jdbc_driver_class => "org.postgresql.Driver"
        # our query
        statement => "select * from tom_test2"
    }
}


output {
    elasticsearch {
        hosts => ["xxx"]
        index => "tom"
        document_type => "tom_test"
    }
}
Run Code Online (Sandbox Code Playgroud)

在弹性搜索中:

GET tom/tom_test/_search

  "hits": {
    "total": 176805,
    "max_score": 1,
}
Run Code Online (Sandbox Code Playgroud)

我在弹性搜索中删除我的索引:

delete tom
Run Code Online (Sandbox Code Playgroud)

我现在想使用jdbc_page_size进行相同的操作,以防我的数据变大,我的logstach conf文件现在是:

input {
    jdbc {
        # Postgres jdbc connection string to our database, mydb
        jdbc_connection_string => "xxx"
        # The user we wish to execute our statement as
        jdbc_user => "xxx"
        jdbc_password => "xxx"
        # The path to our downloaded jdbc driver
        jdbc_driver_library => "xxx"
        # The name of the driver class for Postgresql
        jdbc_driver_class => "org.postgresql.Driver"
        # our query
        statement => "select * from tom_test2"

        jdbc_page_size => 1000
        jdbc_paging_enabled => true
    }
}


output {
    elasticsearch {
        hosts => ["xxx"]
        index => "tom"
        document_type => "tom_test"
    }
}
Run Code Online (Sandbox Code Playgroud)

我的计数现在错了:

GET tom/tom_test/_search

  "hits": {
    "total": 106174,
    "max_score": 1,
}
Run Code Online (Sandbox Code Playgroud)

如176805-106174 = 70631行丢失

Ily*_*hin 2

你面临这个问题的原因 - 你有排序问题:你的查询不能控制接收数据的顺序,并且一般来说 postgresql 不应该保证在无序的后续分页调用中你不会获取相同的数据:这会产生一些数据根本不会被获取,而一些数据将被多次获取的情况:(即使在这些调用期间没有修改数据,后台真空工作人员也可能会更改物理文件中数据的顺序,并且从而重现所描述的情况。

要么在您的报表中添加顺序 SELECT * FROM tom_test2 ORDER BY id并对您的数据进行分页。但请注意:在这种情况下,您上传​​到elasticsearch将无法确保该表在某个时刻的精确副本。其原因是,在后续分页请求的logstash处理过程中,引入了即将到来的页面中的数据更新,即您当前正在上传第1页到第10000页,并且更新发生在第10001页和第20000页上的数据,然后是其他情况...所以你的数据一致性有问题。

或者,如果您想获取所有数据并慷慨地使用logstash...上的内存,那么您需要控制参数jdbc_fetch_size:即您正在执行相同的操作SELECT * FROM tom_test2。使用这种方法,您将创建单个查询结果集,但会将其“泵送”成碎片,并且“泵送”期间的数据修改不会导致您:您将在查询开始时获取状态。