Cra*_*son 3 amazon-web-services amazon-kinesis aws-lambda amazon-kinesis-kpl
我\xe2\x80\x99m 使用 Lambda 将数据记录加载到 Kinesis 中,并且经常想要添加最多 500K 条记录,我将这些记录批量分成 500 条,并使用 Boto 的方法将put_records
它们发送到 Kinesis。我有时会看到由于超出允许的吞吐量而导致的失败。
发生这种情况时重试的最佳方法是什么?理想情况下,我不想在数据流中出现重复的消息,所以我不想简单地重新发送所有 500 条记录,但我很难知道如何仅重试失败的消息。该方法的响应put_records
似乎非常有用。
我可以依赖响应记录列表的顺序与我传递给 putRecords 的列表的顺序相同吗?
\n\n我知道我可以增加分片数量,但我\xe2\x80\x99d 希望显着增加将数据加载到此 Kinesis 流的并行 Lambda 函数的数量。我们计划根据源系统对数据进行分区,我不能保证多个函数不会将数据写入同一个分片并超出允许的吞吐量。因此,我认为增加分片不会消除重试策略的需要。
\n\n或者,有人知道 KPL 是否会自动为我处理这个问题?
\n我可以依赖响应记录列表的顺序与我传递给 putRecords 的列表的顺序相同吗?
是的。您必须依赖响应的顺序。响应记录的顺序与请求记录的顺序相同。
请检查putrecords
回复,https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html。
记录:成功和失败处理的记录结果的数组,通过自然排序与请求相关联。成功添加到流中的记录在结果中包含 SequenceNumber 和 ShardId。添加流失败的记录,结果中包含ErrorCode和ErrorMessage。
要重试失败的记录,您必须开发自己的重试机制。我已经使用递归函数在 python 中编写了重试机制,并按以下方式在重试之间进行增量等待。
import boto3
import time
kinesis_client = boto3.client('kinesis')
KINESIS_RETRY_COUNT = 10
KINESIS_RETRY_WAIT_IN_SEC = 0.1
KINESIS_STREAM_NAME = "your-kinesis-stream"
def send_to_stream(kinesis_records, retry_count):
put_response = kinesis_client.put_records(
Records=kinesis_records,
StreamName=KINESIS_STREAM_NAME
)
failed_count = put_response['FailedRecordCount']
if failed_count > 0:
if retry_count > 0:
retry_kinesis_records = []
for idx, record in enumerate(put_response['Records']):
if 'ErrorCode' in record:
retry_kinesis_records.append(kinesis_records[idx])
time.sleep(KINESIS_RETRY_WAIT_IN_SEC * (KINESIS_RETRY_COUNT - retry_count + 1))
send_to_stream(retry_kinesis_records, retry_count - 1)
else:
print(f'Not able to put records after retries. Records = {put_response["Records"]}')
Run Code Online (Sandbox Code Playgroud)
在上面的示例中,您可以根据需要进行KINESIS_RETRY_COUNT
更改。KINESIS_RETRY_WAIT_IN_SEC
此外,您还必须确保 lambda 超时足以重试。
或者,有人知道 KPL 是否会自动为我处理这个问题?
我不确定 KPL,但从文档看来它有自己的重试机制。https://docs.aws.amazon.com/streams/latest/dev/kinesis- Producer-adv-retries-rate-limiting.html
归档时间: |
|
查看次数: |
5039 次 |
最近记录: |