使用AWS kinesis流,lambda和firehose时,任何经验丰富的数据都会丢失?

lor*_*lew 8 amazon-web-services amazon-kinesis amazon-kinesis-firehose

我目前正在向aws kinesis流发送一系列xml消息,我一直在不同的项目中使用它,所以我非常有信心这一点有效.然后我写了一个lambda来处理从kinesis流到kinesis firehose的事件:

import os
import boto3
import base64

firehose = boto3.client('firehose')


def lambda_handler(event, context):
    deliveryStreamName = os.environ['FIREHOSE_STREAM_NAME']

    # Send record directly to firehose
    for record in event['Records']:
        data = record['kinesis']['data']

        response = firehose.put_record(
            DeliveryStreamName=deliveryStreamName,
            Record={'Data': data}
        )
        print(response)
Run Code Online (Sandbox Code Playgroud)

我将kinesis流设置为lamdba触发器,并将批量大小设置为1,并将起始位置设置为LATEST.

对于kinesis firehose我有以下配置:

Data transformation*: Disabled
Source record backup*: Disabled
S3 buffer size (MB)*: 10
S3 buffer interval (sec)*: 60
S3 Compression: UNCOMPRESSED
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled
Run Code Online (Sandbox Code Playgroud)

我发送了162个事件,我从s3读取它们,并且我已经设法得到它160,通常它更少.我甚至试图等待几个小时,因为重试时发生了一些奇怪的事情.

任何人都有使用kinesis-> lamdba - > firehose的经验,并看到丢失数据的问题?

k0l*_*pak 1

从我在这里看到的,当您将数据发布到 Kinesis Stream(而不是 FireHose)时,很可能会丢失项目。

由于您在写入 FireHose 时使用put_record,因此它会抛出异常,并且在这种情况下将重试 lambda。(检查该级别是否存在故障是有意义的)。

因此,考虑到我可能认为记录在到达 Kinesis 流之前就丢失了。如果您使用put_records方法将项目发送到 Kinesis 流,则不能保证所有记录都将发送到流(由于超出写入吞吐量或内部错误),某些记录可能无法发送。在这种情况下,失败的记录子集应该由您的代码重新发送(这是 Java示例,抱歉我无法找到 Python 示例)。