agx*_*xcv 1 python elasticsearch
我要逐行重复一个csv文件,然后将其插入es。我对python和弹性搜索都是陌生的。如何转换一个csv行并将其一一插入到es中
import csv
import json
from elasticsearch import Elasticsearch
es = Elasticsearch(
[{'host': 'localhost', 'port': 9200}])
print(es)
def csv_reader(file_obj, delimiter=','):
reader = csv.reader(file_obj)
i = 1
results = []
for row in reader:
print(row)
es.index(index='product', doc_type='prod', id=i,
body=json.dump([row for row in reader], file_obj))
i = i + 1
results.append(row)
print(row)
if __name__ == "__main__":
with open("/home/Documents/csv/acsv.csv") as f_obj:
csv_reader(f_obj)
Run Code Online (Sandbox Code Playgroud)
但我收到此错误:
追溯(最近一次通话):
文件“ /home/PycharmProjects/CsvReaderForSyncEs/csvReader.py”,第25行,位于csv_reader(f_obj)中
文件“ /home/PycharmProjects/CsvReaderForSyncEs/csvReader.py”,csv_reader中的第17行
es.index(index ='product',doc_type ='prod',id = i,body = json.dump([在阅读器中排成一行),file_obj))
转储文件fp.write(chunk)中的文件“ /usr/lib/python2.7/json/ init .py”,第190行
IOError:文件无法打开以进行写入
尝试批量API。
import csv
from elasticsearch import helpers, Elasticsearch
def csv_reader(file_name):
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
with open(file_name, 'r') as outfile:
reader = csv.DictReader(outfile)
helpers.bulk(es, reader, index="index_name", doc_type="type")
Run Code Online (Sandbox Code Playgroud)
有关批量API的更多信息, 请访问https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html