使用python客户端进行elasticsearch滚动

Dro*_*ror 3 python elasticsearch pyelasticsearch

在elasticsearch中滚动时,重要的是在每个滚动中提供最新的内容scroll_id

初始搜索请求和每个后续滚动请求将返回一个新的scroll_id?—仅应使用最新的scroll_id。

以下示例(取自此处)使我感到困惑。首先,滚动初始化:

rs = es.search(index=['tweets-2014-04-12','tweets-2014-04-13'], 
               scroll='10s', 
               search_type='scan', 
               size=100, 
               preference='_primary_first',
               body={
                 "fields" : ["created_at", "entities.urls.expanded_url", "user.id_str"],
                   "query" : {
                     "wildcard" : { "entities.urls.expanded_url" : "*.ru" }
                   }
               }
   )
sid = rs['_scroll_id']
Run Code Online (Sandbox Code Playgroud)

然后循环:

tweets = [] while (1):
    try:
        rs = es.scroll(scroll_id=sid, scroll='10s')
        tweets += rs['hits']['hits']
    except:
        break
Run Code Online (Sandbox Code Playgroud)

它可以工作,但是我看不到sid更新的地方。我相信它是在python客户端内部发生的。但我不明白它是如何工作的...

Rya*_*ier 6

这是一个老问题,但由于某种原因,在搜索“elasticsearch python scroll”时首先出现了。python 模块提供了一个辅助方法来为您完成所有工作。它是一个生成器函数,它将在管理底层滚动 ID 的同时将每个文档返回给您。

https://elasticsearch-py.readthedocs.io/en/master/helpers.html#scan

下面是一个使用示例:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

query = {
    "query": {"match_all": {}}
}

es = Elasticsearch(...)
for hit in scan(es, index="my-index", query=query):
    print(hit["_source"]["field"])
Run Code Online (Sandbox Code Playgroud)


anj*_*505 5

使用python请求

import requests
import json

elastic_url = 'http://localhost:9200/my_index/_search?scroll=1m'
scroll_api_url = 'http://localhost:9200/_search/scroll'
headers = {'Content-Type': 'application/json'}

payload = {
    "size": 100,
    "sort": ["_doc"]
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}

r1 = requests.request(
    "POST",
    elastic_url,
    data=json.dumps(payload),
    headers=headers
)

# first batch data
try:
    res_json = r1.json()
    data = res_json['hits']['hits']
    _scroll_id = res_json['_scroll_id']
except KeyError:
    data = []
    _scroll_id = None
    print 'Error: Elastic Search: %s' % str(r1.json())
while data:
    print data
    # scroll to get next batch data
    scroll_payload = json.dumps({
        'scroll': '1m',
        'scroll_id': _scroll_id
    })
    scroll_res = requests.request(
        "POST", scroll_api_url,
        data=scroll_payload,
        headers=headers
    )
    try:
        res_json = scroll_res.json()
        data = res_json['hits']['hits']
        _scroll_id = res_json['_scroll_id']
    except KeyError:
        data = []
        _scroll_id = None
        err_msg = 'Error: Elastic Search Scroll: %s'
        print err_msg % str(scroll_res.json())
Run Code Online (Sandbox Code Playgroud)

参考:https//www.elastic.co/guide/zh-CN/elasticsearch/reference/current/search-request-scroll.html#search-request-scroll


Joh*_*one 4

事实上,代码中有一个错误 - 为了正确使用滚动功能,您应该使用下一次调用scroll()时每次新调用返回的新scroll_id,而不是重用第一个:

\n\n
\n

重要的

\n\n

初始搜索请求和每个后续滚动请求返回\na新的scroll_id\xe2\x80\x89\xe2\x80\x94\xe2\x80\x89仅应使用最新的scroll_id。

\n
\n\n

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html

\n\n

它之所以有效,是因为 Elasticsearch 并不总是在调用之间更改scroll_id,并且对于较小的结果集,可以返回与最初返回一段时间相同的scroll_id。去年的讨论是在其他两个用户之间看到相同的问题,相同的scroll_id 被返回一段时间:

\n\n

http://elasticsearch-users.115913.n3.nabble.com/Distributing-query-results-using-scrolling-td4036726.html

\n\n

因此,虽然您的代码适用于较小的结果集,但它是不正确的 - 您需要捕获每次新调用scroll() 时返回的scroll_id 并将其用于下一次调用。

\n