在 Elasticsearch 中插入多个文档 - 批量文档格式化程序

Chr*_*ins 2 python json elasticsearch

TLDR;如何批量格式化我的 JSON 文件以摄取到 Elasticsearch?

我正在尝试将一些 NOAA 数据摄取到 Elasticsearch 中,并且一直在使用 NOAA Python SDK

我编写了以下 Python 脚本来加载数据并将其存储为 JSON 格式。

from noaa_sdk import noaa
import json

n = noaa.NOAA()
alerts = n.alerts()
f = open('nhc_alerts.json', 'w')
json.dump(alerts, f)
f.write('\n')
Run Code Online (Sandbox Code Playgroud)

JSON 输出:

{"@context": ["https://raw.githubusercontent.com/geojson/geojson-ld/master/contexts/geojson-base.jsonld", {"wx": "https://api.weather.gov/ontology#", "@vocab": "https://api.weather.gov/ontology#"}], "type": "FeatureCollection", "features": [{"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-KEEPALIVE-5246", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-KEEPALIVE-5246", "@type": "wx:Alert", "id": "NWS-IDP-PROD-KEEPALIVE-5246", "areaDesc": "Montgomery", "geocode": {"UGC": ["MDC031"], "SAME": ["024031"]}, "affectedZones": ["https://api.weather.gov/zones/county/MDC031"], "references": [], "sent": "2020-04-25T19:21:03+00:00", "effective": "2020-04-25T19:21:03+00:00", "onset": null, "expires": "2020-04-25T19:31:03+00:00", "ends": null, "status": "Test", "messageType": "Alert", "category": "Met", "severity": "Unknown", "certainty": "Unknown", "urgency": "Unknown", "event": "Test Message", "sender": "w-nws.webmaster@noaa.gov", "senderName": "NWS", "headline": null, "description": "Monitoring message only. Please disregard.", "instruction": "Monitoring message only. Please disregard.", "response": "None", "parameters": {"PIL": ["NWSKEPWBC"], "BLOCKCHANNEL": ["CMAS", "EAS", "NWEM"]}}}, {"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179499-3536427", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179499-3536427", "@type": "wx:Alert", "id": "NWS-IDP-PROD-4179499-3536427", "areaDesc": "La Salle; Livingston", "geocode": {"UGC": ["ILZ019", "ILZ032"], "SAME": ["017099", "017105"]}, "affectedZones": ["https://api.weather.gov/zones/forecast/ILZ019", "https://api.weather.gov/zones/forecast/ILZ032"], "references": [{"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179245-3536278", "identifier": "NWS-IDP-PROD-4179245-3536278", "sender": "w-nws.webmaster@noaa.gov", "sent": "2020-04-25T10:02:00-05:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4178935-3536074", "identifier": "NWS-IDP-PROD-4178935-3536074", "sender": "w-nws.webmaster@noaa.gov", "sent": "2020-04-25T03:09:00-05:00"}], "sent": "2020-04-25T14:21:00-05:00", "effective": "2020-04-25T14:21:00-05:00", "onset": "2020-04-25T14:21:00-05:00", "expires": "2020-04-25T22:30:00-05:00", "ends": "2020-04-26T01:00:00-05:00", "status": "Actual", "messageType": "Update", "category": "Met", "severity": "Severe", "certainty": "Possible", "urgency": "Future", "event": "Flood Watch", "sender": "w-nws.webmaster@noaa.gov", "senderName": "NWS Chicago IL", "headline": "Flood Watch issued April 25 at 2:21PM CDT until April 26 at 1:00AM CDT by NWS Chicago IL", "description": "The Flood Watch is now in effect for\n\n* Livingston and La Salle counties in north central Illinois\n\n* Until 1 AM CDT Sunday\n\n* WHAT...Steady rain. One to two inches of rain has already\nfallen. Additional rainfall amounts of one inch or locally more\nare possible which may lead to total rainfall amounts in excess\nof three inches.\n\n* IMPACTS...Rises in rivers and small streams will occur with\nflooding possible. This especially includes the Vermilion River\nand its tributary streams, and the Illinois River. Roadways,\nviaducts, ditches, agricultural land, and other poor drainage\nareas may become flooded.", "instruction": "A Flood Watch means there is a potential for flooding based on\ncurrent forecasts.\n\nYou should monitor later forecasts and be alert for possible\nFlood Warnings. Those living in areas prone to flooding should be\nprepared to take action should flooding develop.", "response": "Prepare", "parameters": {"NWSheadline": ["FLOOD WATCH NOW IN EFFECT UNTIL 1 AM CDT SUNDAY"], "VTEC": ["/O.EXT.KLOT.FA.A.0002.000000T0000Z-200426T0600Z/"], "EAS-ORG": ["WXR"], "PIL": ["LOTFFALOT"], "BLOCKCHANNEL": ["CMAS", "EAS", "NWEM"], "eventEndingTime": ["2020-04-26T01:00:00-05:00"]}}}, {"id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179497-3536425", "type": "Feature", "geometry": null, "properties": {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179497-3536425", "@type": "wx:Alert", "id": "NWS-IDP-PROD-4179497-3536425", "areaDesc": "San Luis Obispo County Central Coast; Santa Barbara County Central Coast; Santa Ynez Valley", "geocode": {"UGC": ["CAZ034", "CAZ035", "CAZ036"], "SAME": ["006079", "006083"]}, "affectedZones": ["https://api.weather.gov/zones/forecast/CAZ034", "https://api.weather.gov/zones/forecast/CAZ035", "https://api.weather.gov/zones/forecast/CAZ036"], "references": [{"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4177692-3535278", "identifier": "NWS-IDP-PROD-4177692-3535278", "sender": "w-nws.webmaster@noaa.gov", "sent": "2020-04-24T08:54:00-07:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4178774-3535999", "identifier": "NWS-IDP-PROD-4178774-3535999", "sender": "w-nws.webmaster@noaa.gov", "sent": "2020-04-24T21:37:00-07:00"}, {"@id": "https://api.weather.gov/alerts/NWS-IDP-PROD-4179040-3536147", "identifier": "NWS-IDP-PROD-4179040-3536147", "sender": "w-nws.webmaster@noaa.gov", "sent": 
Run Code Online (Sandbox Code Playgroud)

这个脚本解决了我遇到的一些格式化问题,我的下一个障碍是尝试格式化它,以便我可以利用 elasticsearch 中的批量导入功能。我偶然发现了一个在一定程度上有效的答案,我遇到的问题是它会插入适当的索引字符串,但它是在每个字符之后执行的。

批量转换脚本:

import json


JSON_FILE_IN = "nhc_alerts.json"
JSON_FILE_OUT = "nhc_bulk.json"


out = open(JSON_FILE_OUT, 'w')
with open(JSON_FILE_IN, 'r') as json_in:
    docs = json.dumps(json_in.read())
    for doc in docs:
        out.write('%s\n' % json.dumps({'index': {}}));
        out.write('%s\n' % json.dumps(doc, indent=0).replace('\n', ''))
Run Code Online (Sandbox Code Playgroud)

批量脚本的输出:

{"index": {}}
"\""
{"index": {}}
"{"
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
"@"
{"index": {}}
"c"
{"index": {}}
"o"
{"index": {}}
"n"
{"index": {}}
"t"
{"index": {}}
"e"
{"index": {}}
"x"
{"index": {}}
"t"
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
":"
{"index": {}}
" "
{"index": {}}
"["
{"index": {}}
"\\"
{"index": {}}
"\""
{"index": {}}
"h"
{"index": {}}
"t"
{"index": {}}
"t"
{"index": {}}
"p"
{"index": {}}
"s"
{"index": {}}
":"
{"index": {}}
"/"
{"index": {}}
"/"
{"index": {}}
"r"
{"index": {}}
"a"
{"index": {}}
"w"
{"index": {}}
"."
{"index": {}}
"g"
{"index": {}}
"i"
{"index": {}}
"t"
{"index": {}}
"h"
{"index": {}}
"u"
{"index": {}}
"b"
{"index": {}}
"u"
{"index": {}}
"s"
{"index": {}}
"e"
{"index": {}}
"r"
{"index": {}}
"c"
{"index": {}}
"o"
{"index": {}}
"n"
{"index": {}}
Run Code Online (Sandbox Code Playgroud)

理想情况下,我想将这两个脚本合并为一个,但此时,如果完成工作,我将运行两个单独的脚本。

Joe*_*ook 7

bulkpython官方客户端的方法怎么样?

import json

from noaa_sdk import noaa
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk


noaa_client = noaa.NOAA()
alerts = noaa_client.alerts()['features']

es = Elasticsearch()


def save_alerts():
    with open('nhc_alerts.json', 'w') as f:
        f.write(json.dumps(alerts))


def bulk_sync():
    actions = [
        {
            "_index": "my_noaa_index",
            "_source": alert
        } for alert in alerts
    ]

    bulk(es, actions)


save_alerts()
bulk_sync()

Run Code Online (Sandbox Code Playgroud)