通过批量API重新索引弹性搜索,扫描和滚动

Zac*_*ack 12 python indexing elasticsearch reindex elasticsearch-bulk-api

我试图重新索引我的松紧搜索设置,目前正在研究弹性的文献检索使用Python API的例子

关于这一切如何运作我有点困惑.我能够从Python API获取滚动ID:

es = Elasticsearch("myhost")

index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")

scroll_id = response["_scroll_id"]
Run Code Online (Sandbox Code Playgroud)

现在我的问题是,这对我有什么用?什么知道滚动ID甚至给我?文档说使用"批量API",但我不知道scoll_id如何影响到这一点,这有点令人困惑.

谁能给出一个简单的例子展示我如何重新索引从这个角度考虑,我已经得到了正确的scroll_id?

ham*_*med 9

这是使用elasticsearch-py重新索引到另一个elasticsearch节点的示例:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)
Run Code Online (Sandbox Code Playgroud)

您还可以将查询结果重新索引到不同的索引,这里是如何做到的:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)
Run Code Online (Sandbox Code Playgroud)


Jet*_*die 7

您好,您可以使用滚动API以最有效的方式浏览所有文档.使用scroll_id,您可以找到存储在服务器上的特定滚动请求的会话.因此,您需要为每个请求提供scroll_id以获取更多项目.

批量api用于更有效的索引文档.复制和索引时你需要两者,但它们并不真正相关.

我确实有一些Java代码可以帮助您更好地了解它的工作原理.

    public void reIndex() {
    logger.info("Start creating a new index based on the old index.");

    SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
            .setQuery(matchAllQuery())
            .setSearchType(SearchType.SCAN)
            .setScroll(createScrollTimeoutValue())
            .setSize(SCROLL_SIZE).execute().actionGet();

    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
            createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
            .setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
            .setFlushInterval(createFlushIntervalTime())
            .build();

    while (true) {
        searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                .setScroll(createScrollTimeoutValue()).execute().actionGet();

        if (searchResponse.getHits().getHits().length == 0) {
            logger.info("Closing the bulk processor");
            bulkProcessor.close();
            break; //Break condition: No hits are returned
        }

        for (SearchHit hit : searchResponse.getHits()) {
            IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
            request.source(hit.sourceRef());
            bulkProcessor.add(request);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


小智 5

对于遇到此问题的任何人,您可以使用Python客户端中的以下API来重新索引:

https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex

这将有助于您避免必须滚动和搜索以获取所有数据并使用批量API将数据放入新索引中.