ima*_*747 5 elasticsearch amazon-kinesis-firehose
我们有一个firehose,可以将记录发送到Elasticsearch Service集群。我们的集群已满,一些记录已故障转移到S3。https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry上的文档指出,失败的记录可用于回填:“已跳过的文档会在elasticsearch_failed /文件夹,您可以将其用于手动回填”,但是我找不到任何有关如何完成此操作的文档。
查看记录,它们似乎是包含JSON blob的文本文件的gzip文件,其中的“ rawData”字段包含我们发送到firehose的原始记录的base64编码的字符串。
是否存在现有工具来处理S3中的这些gzip文件,对其进行分解并重新提交记录?该文档暗示您可以“手动进行回填”,这是一个非常标准化的流程,因此我以为以前有人进行过此操作,但我找不到方法。
小智 1
我认为手动回填意味着使用 AWS SDK 之一将文档再次发送到 Elasticsearch 中。python 中的示例(使用 boto3)从 S3 读取故障文件并将其中的文档发送到 Elasticsearch:
es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
for case in failure_cases:
try:
data = base64.b64decode(case['rawData'])
es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
logger.debug("Successfully sent {}".format(case['esDocumentId']))
except RequestError:
logger.info("Retry failed for Document ID {}\nReason: {}"
.format(case['esDocumentId'], case['errorMessage']))
Run Code Online (Sandbox Code Playgroud)