bud*_*mat 1 python json bulkinsert elasticsearch elasticsearch-py
我使用 python 生成大量具有随机内容的 elasticsearch 文档,并使用elasticsearch-py对其进行索引。
简化的工作示例(只有一个字段的文档):
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
es_client.index(index='my_index', document=document)
Run Code Online (Sandbox Code Playgroud)
由于这会对每个文档发出一个请求,因此我尝试通过使用 API 发送 1000 个文档块来加快速度_bulk
。然而,到目前为止我的尝试还没有成功。
我对文档的理解是,您可以将 iterable 传递给bulk()
,所以我尝试了:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
es_client.bulk(operations=document_list, index='my_index')
document_list = []
Run Code Online (Sandbox Code Playgroud)
但这会导致
elasticsearch.BadRequestError: BadRequestError(400, 'illegal_argument_exception', '格式错误的操作/元数据行 [1],应为 START_OBJECT 或 END_OBJECT,但发现为 [VALUE_STRING]')
好吧,看来我混淆了两个不同的功能:helpers.bulk()
和Elasticsearch.bulk()
。两者都可以用来实现我想要做的事情,但它们的签名略有不同。
该helpers.bulk()
函数采用一个Elasticsearch()
对象和一个包含文档的可迭代对象作为参数。该操作可以指定为_op_type
并且可以是index
、create
、delete
、 或之一update
。由于_op_type
默认为index
,我们可以省略它并简单地传递文档列表,在这种情况下:
from elasticsearch import Elasticsearch, helpers
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
document_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
document_list.append(document)
if i % 1000 == 0:
helpers.bulk(es_client, document_list, index='my_index')
document_list = []
Run Code Online (Sandbox Code Playgroud)
这很好用。
该Elasticsearch.bulk()
函数可以替代使用,但动作/操作是强制性的,作为可迭代的一部分,并且语法略有不同。这意味着dict
我们需要指定dict
操作(在本例中"index": {}
为 )以及每个文档的正文,而不仅仅是包含文档内容。另请参阅_bulk
文档:
from elasticsearch import Elasticsearch
from random import getrandbits
es_client = Elasticsearch('https://elastic.host:9200')
actions_list = []
for i in range(1,10000000):
document = {'my_field': getrandbits(64)}
actions_list.append({"index": {}, "doc": document})
if i % 1000 == 0:
es_client.bulk(operations=actions_list, index='my_index')
actions_list = []
Run Code Online (Sandbox Code Playgroud)
这也很好用。
我假设以上两者_bulk
在内部生成相同的 REST API 语句,因此它们最终应该是等效的。
归档时间: |
|
查看次数: |
6470 次 |
最近记录: |