使用 Elasticsearch for python 批量索引/创建文档

bud*_*mat 1 python json bulkinsert elasticsearch elasticsearch-py

我使用 python 生成大量具有随机内容的 elasticsearch 文档,并使用elasticsearch-py对其进行索引。

简化的工作示例(只有一个字段的文档):

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    es_client.index(index='my_index', document=document)
Run Code Online (Sandbox Code Playgroud)

由于这会对每个文档发出一个请求,因此我尝试通过使用 API 发送 1000 个文档块来加快速度_bulk。然而,到目前为止我的尝试还没有成功。

我对文档的理解是,您可以将 iterable 传递给bulk(),所以我尝试了:

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    document_list.append(document)
    if i % 1000 == 0:
        es_client.bulk(operations=document_list, index='my_index')
        document_list = []
Run Code Online (Sandbox Code Playgroud)

但这会导致

elasticsearch.BadRequestError: BadRequestError(400, 'illegal_argument_exception', '格式错误的操作/元数据行 [1],应为 START_OBJECT 或 END_OBJECT,但发现为 [VALUE_STRING]')

bud*_*mat 6

好吧,看来我混淆了两个不同的功能:helpers.bulk()Elasticsearch.bulk()。两者都可以用来实现我想要做的事情,但它们的签名略有不同。

helpers.bulk()函数采用一个Elasticsearch()对象和一个包含文档的可迭代对象作为参数。该操作可以指定为_op_type并且可以是indexcreatedelete、 或之一update。由于_op_type默认为index,我们可以省略它并简单地传递文档列表,在这种情况下:

from elasticsearch import Elasticsearch, helpers
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    document_list.append(document)
    if i % 1000 == 0:
        helpers.bulk(es_client, document_list, index='my_index')
        document_list = []
Run Code Online (Sandbox Code Playgroud)

这很好用。

Elasticsearch.bulk()函数可以替代使用,但动作/操作是强制性的,作为可迭代的一部分,并且语法略有不同。这意味着dict我们需要指定dict操作(在本例中"index": {}为 )以及每个文档的正文,而不仅仅是包含文档内容。另请参阅_bulk文档

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('https://elastic.host:9200')

actions_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    actions_list.append({"index": {}, "doc": document})
    if i % 1000 == 0:
        es_client.bulk(operations=actions_list, index='my_index')
        actions_list = []
Run Code Online (Sandbox Code Playgroud)

这也很好用。

我假设以上两者_bulk在内部生成相同的 REST API 语句,因此它们最终应该是等效的。