使用Kinesis中的分区键来保证具有相同密钥的记录由同一记录处理器(lambda)处理

one*_*ing 6 amazon-web-services amazon-kinesis aws-lambda

我正在使用AWS kinesis和lambda开发实时数据管道,我试图弄清楚如何保证来自相同数据生成器的记录由同一个分片处理,最终由同一个lambda函数实例处理.

我的方法是使用分区键来确保来自相同生成器的记录由同一个分片处理.但是,我无法使同一个分片中的记录由同一个lambda函数实例处理.

基本设置如下:

  • 有多个数据源将数据发送到kinesis流.
  • 该流有多个分片来处理负载.
  • 有一个lambda函数连接到scream,带有事件源映射(批量大小为500).
  • lambda函数正在处理记录,执行一些数据转换和一些其他事情,然后将所有内容放入firehose.
  • 之后会发生更多事情,但这与问题无关.

它看起来像这样:

在此输入图像描述

如图所示,有三个lambda函数实例被调用进行处理; 每个碎片一个.在此管道中,同一个lambda函数实例处理来自同一数据源的记录非常重要.根据我读到的内容,可以通过确保来自同一源的所有记录使用相同的分区键来保证这一点,以便它们由相同的分片处理.

分区键

分区键用于通过流中的分片对数据进行分组.Kinesis Data Streams服务使用与每个数据记录关联的分区键将属于流的数据记录隔离为多个分片,以确定给定数据记录属于哪个分片.分区键是Unicode字符串,最大长度限制为256个字节.MD5哈希函数用于将分区键映射到128位整数值,并将关联的数据记录映射到分片.当应用程序将数据放入流中时,它必须指定分区键.

资料来源:https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key

这有效.因此,具有相同分区键的记录由相同的分片处理.但是,它们由不同的lambda函数实例处理.因此,每个分片调用一个lambda函数实例,但它不仅处理来自一个分片但来自多个分片的记录.这里似乎没有模式如何将记录移交给lambda.

这是我的测试设置:我将一堆测试数据发送到流中并在lambda函数中打印记录.这是三个函数实例的输出(检查每行末尾的分区键.每个键只应出现在三个日志中的一个而不是多个日志中):

Lambda实例1:

{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
Run Code Online (Sandbox Code Playgroud)

Lambda实例2:

{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
Run Code Online (Sandbox Code Playgroud)

Lambda实例3:

{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
Run Code Online (Sandbox Code Playgroud)

这是我将数据插入流中的方法(如您所见,分区键设置为源ID):

processed_records = []
for r in records:
    processed_records.append({
        'PartitionKey': str(r['source']),
        'Data': json.dumps(r),
    })

kinesis.put_records(
    StreamName=stream,
    Records=processed_records,
)
Run Code Online (Sandbox Code Playgroud)

所以我的问题是:

  • 为什么每个lambda函数不能只处理一个碎片的记录?
  • 如何才能做到这一点?

谢谢!

Jos*_*eph 3

您为什么要关心哪个 Lambda 实例处理分片?Lambda 实例无论如何都没有状态,因此哪个实例读取哪个分片并不重要。更重要的是,任何时候 Lambda 实例都只能从一个分片中读取数据。调用完成后,它可以从另一个分片读取。