Elasticsearch python API:按查询删除文档

sys*_*ser 4 python elasticsearch pyes pyelasticsearch

我看到以下API将在Elasticsearch中通过查询进行删除 - http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-delete-by-query.html

但我想对弹性搜索批量API做同样的事情,即使我可以使用批量上传文档

es.bulk(body=json_batch)
Run Code Online (Sandbox Code Playgroud)

我不知道如何使用python批量API进行弹性搜索来调用查询删除.

小智 7

看看elasticsearch如何通过查询API弃用删除.我使用绑定创建了这个python脚本来做同样的事情.首先定义ES连接:

import elasticsearch
es = elasticsearch.Elasticsearch(['localhost'])
Run Code Online (Sandbox Code Playgroud)

现在,您可以使用它来为要删除的结果创建查询.

search=es.search(
    q='The Query to ES.',
    index="*logstash-*",
    size=10,
    search_type="scan",
    scroll='5m',
)
Run Code Online (Sandbox Code Playgroud)

现在,您可以循环滚动该查询.在我们这样做时生成我们的请求.

 while True:
    try: 
      # Git the next page of results. 
      scroll=es.scroll( scroll_id=search['_scroll_id'], scroll='5m', )
    # Since scroll throws an error catch it and break the loop. 
    except elasticsearch.exceptions.NotFoundError: 
      break 
    # We have results initialize the bulk variable. 
    bulk = ""
    for result in scroll['hits']['hits']:
      bulk = bulk + '{ "delete" : { "_index" : "' + str(result['_index']) + '", "_type" : "' + str(result['_type']) + '", "_id" : "' + str(result['_id']) + '" } }\n'
    # Finally do the deleting. 
    es.bulk( body=bulk )
Run Code Online (Sandbox Code Playgroud)

要使用批量api,您需要确保两件事:

  1. 文档已标识您要更新.(索引,类型,id)
  2. 每个请求都以换行符或/ n终止.


drs*_*drs 5

elasticsearch-py散装API允许你通过包括批量删除记录,'_op_type': 'delete'每个记录。但是,如果要按查询删除,则仍然需要进行两个查询:一个查询要删除的记录,另一个查询要删除它们。

批量执行此操作的最简单方法是使用python模块的scan()帮助程序,该帮助程序包装了ElasticSearch Scroll API,因此您不必跟踪_scroll_ids。与bulk()帮助程序一起使用,以代替已弃用的delete_by_query()

from elasticsearch.helpers import bulk, scan

bulk_deletes = []
for result in scan(es,
                   query=es_query_body,  # same as the search() body parameter
                   index=ES_INDEX,
                   doc_type=ES_DOC,
                   _source=False,
                   track_scores=False,
                   scroll='5m'):

    result['_op_type'] = 'delete'
    bulk_deletes.append(result)

bulk(elasticsearch, bulk_deletes)
Run Code Online (Sandbox Code Playgroud)

由于_source=False已传递,因此不返回文档正文,因此每个结果都非常小。但是,如果您有内存限制,则可以很轻松地进行批处理:

BATCH_SIZE = 100000

i = 0
bulk_deletes = []
for result in scan(...):

    if i == BATCH_SIZE:
        bulk(elasticsearch, bulk_deletes)
        bulk_deletes = []
        i = 0

    result['_op_type'] = 'delete'
    bulk_deletes.append(result)

    i += 1

bulk(elasticsearch, bulk_deletes)
Run Code Online (Sandbox Code Playgroud)


小智 5

我目前正在使用基于@drs 响应的脚本,但始终使用bulk()帮助程序。它能够使用chunk_size参数(默认为 500,请参阅straming_bulk()了解更多信息)从迭代器创建批量作业。

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan, bulk

BULK_SIZE = 1000

def stream_items(es, query):
    for e in scan(es, 
                  query=query, 
                  index=ES_INDEX,
                  doc_type=ES_DOCTYPE, 
                  scroll='1m',
                  _source=False):

        # There exists a parameter to avoid this del statement (`track_source`) but at my version it doesn't exists.
        del e['_score']
        e['_op_type'] = 'delete'
        yield e

es = Elasticsearch(host='localhost')
bulk(es, stream_items(es, query), chunk_size=BULK_SIZE)
Run Code Online (Sandbox Code Playgroud)