Dro*_*ror 3 python elasticsearch pyelasticsearch
在elasticsearch中滚动时,重要的是在每个滚动中提供最新的内容scroll_id:
初始搜索请求和每个后续滚动请求将返回一个新的scroll_id?—仅应使用最新的scroll_id。
以下示例(取自此处)使我感到困惑。首先,滚动初始化:
rs = es.search(index=['tweets-2014-04-12','tweets-2014-04-13'],
scroll='10s',
search_type='scan',
size=100,
preference='_primary_first',
body={
"fields" : ["created_at", "entities.urls.expanded_url", "user.id_str"],
"query" : {
"wildcard" : { "entities.urls.expanded_url" : "*.ru" }
}
}
)
sid = rs['_scroll_id']
Run Code Online (Sandbox Code Playgroud)
然后循环:
tweets = [] while (1):
try:
rs = es.scroll(scroll_id=sid, scroll='10s')
tweets += rs['hits']['hits']
except:
break
Run Code Online (Sandbox Code Playgroud)
它可以工作,但是我看不到sid更新的地方。我相信它是在python客户端内部发生的。但我不明白它是如何工作的...
这是一个老问题,但由于某种原因,在搜索“elasticsearch python scroll”时首先出现了。python 模块提供了一个辅助方法来为您完成所有工作。它是一个生成器函数,它将在管理底层滚动 ID 的同时将每个文档返回给您。
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#scan
下面是一个使用示例:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
query = {
"query": {"match_all": {}}
}
es = Elasticsearch(...)
for hit in scan(es, index="my-index", query=query):
print(hit["_source"]["field"])
Run Code Online (Sandbox Code Playgroud)
使用python请求
import requests
import json
elastic_url = 'http://localhost:9200/my_index/_search?scroll=1m'
scroll_api_url = 'http://localhost:9200/_search/scroll'
headers = {'Content-Type': 'application/json'}
payload = {
"size": 100,
"sort": ["_doc"]
"query": {
"match" : {
"title" : "elasticsearch"
}
}
}
r1 = requests.request(
"POST",
elastic_url,
data=json.dumps(payload),
headers=headers
)
# first batch data
try:
res_json = r1.json()
data = res_json['hits']['hits']
_scroll_id = res_json['_scroll_id']
except KeyError:
data = []
_scroll_id = None
print 'Error: Elastic Search: %s' % str(r1.json())
while data:
print data
# scroll to get next batch data
scroll_payload = json.dumps({
'scroll': '1m',
'scroll_id': _scroll_id
})
scroll_res = requests.request(
"POST", scroll_api_url,
data=scroll_payload,
headers=headers
)
try:
res_json = scroll_res.json()
data = res_json['hits']['hits']
_scroll_id = res_json['_scroll_id']
except KeyError:
data = []
_scroll_id = None
err_msg = 'Error: Elastic Search Scroll: %s'
print err_msg % str(scroll_res.json())
Run Code Online (Sandbox Code Playgroud)
参考:https://www.elastic.co/guide/zh-CN/elasticsearch/reference/current/search-request-scroll.html#search-request-scroll
事实上,代码中有一个错误 - 为了正确使用滚动功能,您应该使用下一次调用scroll()时每次新调用返回的新scroll_id,而不是重用第一个:
\n\n\n\n\n重要的
\n\n初始搜索请求和每个后续滚动请求返回\na新的scroll_id\xe2\x80\x89\xe2\x80\x94\xe2\x80\x89仅应使用最新的scroll_id。
\n
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html
\n\n它之所以有效,是因为 Elasticsearch 并不总是在调用之间更改scroll_id,并且对于较小的结果集,可以返回与最初返回一段时间相同的scroll_id。去年的讨论是在其他两个用户之间看到相同的问题,相同的scroll_id 被返回一段时间:
\n\n\n\n因此,虽然您的代码适用于较小的结果集,但它是不正确的 - 您需要捕获每次新调用scroll() 时返回的scroll_id 并将其用于下一次调用。
\n