Dir*_*irk 6 python elasticsearch pandas
我想将一堆大型pandas数据帧(几百万行和50列)索引到Elasticsearch中.
在查找如何执行此操作的示例时,大多数人将使用elasticsearch-py的批量帮助程序方法,向其传递处理连接的Elasticsearch类的实例以及使用pandas的dataframe.to_dict创建的字典列表( orient ='records')方法.元数据可以预先作为新列插入到数据帧中,例如df['_index'] = 'my_index'等.
但是,我有理由不使用elasticsearch-py库,并希望直接与Elasticsearch批量API通信,例如通过请求或其他方便的HTTP库.不过df.to_dict(),不幸的是,在大型数据帧上运行速度非常慢,并且将数据帧转换为dicts列表然后通过elasticsearch-py序列化为JSON听起来像是不必要的开销,当有像dataframe.to_json()这样的东西时甚至很快大型数据帧.
将pandas数据帧变为批量API所需格式的简单快捷方法是什么?我认为朝着正确方向迈出的一步是dataframe.to_json()如下:
import pandas as pd
df = pd.DataFrame.from_records([{'a': 1, 'b': 2}, {'a': 3, 'b': 4}, {'a': 5, 'b': 6}])
df
a b
0 1 2
1 3 4
2 5 6
df.to_json(orient='records', lines=True)
'{"a":1,"b":2}\n{"a":3,"b":4}\n{"a":5,"b":6}'
Run Code Online (Sandbox Code Playgroud)
现在这是一个换行符分隔的JSON字符串,但它仍然缺少元数据.什么是一种表演方式来获得它?
编辑: 为了完整性,元数据JSON文档将如下所示:
{"index": {"_index": "my_index", "_type": "my_type"}}
Run Code Online (Sandbox Code Playgroud)
因此,最终批量API所期望的整个JSON看起来像这样(在最后一行之后有一个额外的换行符):
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":1,"b":2}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":3,"b":4}
{"index": {"_index": "my_index", "_type": "my_type"}}
{"a":5,"b":6}
Run Code Online (Sandbox Code Playgroud)
与此同时,我发现了多种可能性,如何以至少合理的速度做到这一点:
import json
import pandas as pd
import requests
# df is a dataframe or dataframe chunk coming from your reading logic
df['_id'] = df['column_1'] + '_' + df['column_2'] # or whatever makes your _id
df_as_json = df.to_json(orient='records', lines=True)
final_json_string = ''
for json_document in df_as_json.split('\n'):
jdict = json.loads(json_document)
metadata = json.dumps({'index': {'_id': jdict['_id']}})
jdict.pop('_id')
final_json_string += metadata + '\n' + json.dumps(jdict) + '\n'
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
r = requests.post('http://elasticsearch.host:9200/my_index/my_type/_bulk', data=final_json_string, headers=headers, timeout=60)
Run Code Online (Sandbox Code Playgroud)
除了使用 pandas 的to_json()方法之外,还可以使用to_dict()如下方法。在我的测试中这稍微慢一些,但也慢不了多少:
dicts = df.to_dict(orient='records')
final_json_string = ''
for document in dicts:
metadata = {"index": {"_id": document["_id"]}}
document.pop('_id')
final_json_string += json.dumps(metadata) + '\n' + json.dumps(document) + '\n'
Run Code Online (Sandbox Code Playgroud)
当在大型数据集上运行此程序时,可以通过分别安装ujson或Rapidjsonjson来替换 Python 的默认库,然后或,从而节省几分钟。import ujson as jsonimport rapidjson as json
通过将步骤的顺序执行替换为并行执行,可以实现更大的加速,以便在请求等待 Elasticsearch 处理所有文档并返回响应时读取和转换不会停止。这可以通过线程、多处理、异步、任务队列等来完成,但这超出了这个问题的范围。
如果您碰巧找到一种更快地进行 json 转换的方法,请告诉我。
| 归档时间: |
|
| 查看次数: |
6716 次 |
| 最近记录: |