使用 OpenSearch Python 批量 api 将数据插入到多个索引

anu*_*agi 5 api rest opensearch elasticsearch

本文档展示了如何使用curl中的POST请求插入具有多个索引的批量数据:https://opensearch.org/docs/latest/opensearch/index-data/

如果我有这种格式的数据,

[
{ "index": { "_index": "index-2022-06-08", "_id": "<id>" } }
{ "A JSON": "document" }
{ "index": { "_index": "index-2022-06-09", "_id": "<id>" } }
{ "A JSON": "document" }
{ "index": { "_index": "index-2022-06-10", "_id": "<id>" } }
{ "A JSON": "document" }
]
Run Code Online (Sandbox Code Playgroud)

批量请求应从以下位置获取索引名称"_index": "index-2022-06-08"

我试图使用 OpenSearch-py 库来做同样的事情,但我找不到任何示例片段可以做到这一点。我正在使用此格式从 AWS Lambda 发送请求。

client = OpenSearch(
            hosts = [{'host': host, 'port': 443}],
            http_auth = awsauth,
            use_ssl = True,
            verify_certs = True,
            connection_class = RequestsHttpConnection
            )
        
        resp = helpers.bulk(client, logs, index= index_name, max_retries = 3)
Run Code Online (Sandbox Code Playgroud)

在这里,我必须提到索引名称作为批量请求中的参数,因此它不会从数据本身获取索引名称。如果我没有在参数中提及index_name,则会收到错误4xx index_name丢失。

我还在研究批量 api 源代码:https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/helpers/actions.py#L373

看起来index_name不是一个强制参数。

谁能帮助我解决我所缺少的东西?

小智 6

我遇到了同样的问题,并在elasticsearch.py​​bulk-helpers文档中找到了解决方案。_source-structure当搜索端点返回的文档中提供时,它就可以工作。

批量方法的调用:

resp = helpers.bulk(
    self.opensearch,
    actions,
    max_retries=3,
)
Run Code Online (Sandbox Code Playgroud)

哪里actions有这样的字典列表:

[{
    '_op_type': 'update',
    '_index': 'index-name',
    '_id': 42,
    '_source': {
        "title": "Hello World!",
        "body": "..."
    }
}]
Run Code Online (Sandbox Code Playgroud)

_op_type可以用作附加字段来定义应为文档调用的操作( indexupdate、 、...)。delete

希望这可以帮助任何遇到同样问题的人!