我目前正在向aws kinesis流发送一系列xml消息,我一直在不同的项目中使用它,所以我非常有信心这一点有效.然后我写了一个lambda来处理从kinesis流到kinesis firehose的事件:
import os
import boto3
import base64
firehose = boto3.client('firehose')
def lambda_handler(event, context):
deliveryStreamName = os.environ['FIREHOSE_STREAM_NAME']
# Send record directly to firehose
for record in event['Records']:
data = record['kinesis']['data']
response = firehose.put_record(
DeliveryStreamName=deliveryStreamName,
Record={'Data': data}
)
print(response)
Run Code Online (Sandbox Code Playgroud)
我将kinesis流设置为lamdba触发器,并将批量大小设置为1,并将起始位置设置为LATEST.
对于kinesis firehose我有以下配置:
Data transformation*: Disabled
Source record backup*: Disabled
S3 buffer size (MB)*: 10
S3 buffer interval (sec)*: 60
S3 Compression: UNCOMPRESSED
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled
Run Code Online (Sandbox Code Playgroud)
我发送了162个事件,我从s3读取它们,并且我已经设法得到它160,通常它更少.我甚至试图等待几个小时,因为重试时发生了一些奇怪的事情.
任何人都有使用kinesis-> lamdba - > firehose的经验,并看到丢失数据的问题?