如何在Elasticsearch中批量插入而忽略过程中可能出现的所有错误?

Boo*_*oom 2 python csv elasticsearch

我用的是Elasticsearch6.8版本。

我需要将大约 10000 个文档(来自 csv 文件)插入到现有索引和映射索引中。

我正在使用python(版本 3.7)代码:

    import csv  
    es = Elasticsearch();
    from elasticsearch import helpers
    with open(file_path) as f:
        reader = csv.DictReader(f)
        helpers.bulk(es, reader, index=index_name, doc_type=doc_type)
Run Code Online (Sandbox Code Playgroud)

但我收到错误:

raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
elasticsearch.helpers.errors.BulkIndexError: ('3 document(s) failed to index.'
Run Code Online (Sandbox Code Playgroud)

发生错误的原因是 csv 文件中的某些值具有字符串值而不是浮点值。

499 个文档后批量停止,应用程序崩溃。

有没有办法批量批量处理所有文档(~10000),如果出现错误(由于映射或错误的值),请告诉python/elastic忽略这些文档并继续批量操作?

Kev*_*zel 5

您可以将 arg 设置raise_on_errorFalse,因为它是True默认的,如Python 批量文档中建议的那样。它应该看起来像这样:

helpers.bulk(es, reader, index=index_name, doc_type=doc_type, raise_on_error=False)
Run Code Online (Sandbox Code Playgroud)

请记住:

收集错误时,原始文档数据包含在错误字典中,这可能会导致内存使用量过高。如果您需要处理大量数据并希望忽略/收集错误,请考虑使用Streaming_bulk()帮助程序,它只会返回错误而不将它们存储在内存中。

您还可以查看此Python ES 批量示例中的示例 12、25 和 39