在python中使用kinesis流

ali*_*irz 23 python stream boto amazon-web-services

我似乎无法找到一个体面的例子,展示如何通过Python使用AWS Kinesis流.有人可以请我提供一些我可以研究的例子吗?

最好

Eya*_* Ch 33

你应该使用boto.kinesis:

from boto import kinesis
Run Code Online (Sandbox Code Playgroud)

创建流后:

第1步:连接到aws kinesis:

auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)
Run Code Online (Sandbox Code Playgroud)

第2步:获取流信息(如果有多少个分片,如果它是活动的..)

tries = 0
while tries < 10:
    tries += 1
    time.sleep(1)
    try:
        response = connection.describe_stream('stream_name')   
        if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
            break 
    except :
        logger.error('error while trying to describe kinesis stream : %s')
else:
    raise TimeoutError('Stream is still not active, aborting...')
Run Code Online (Sandbox Code Playgroud)

第3步:获取所有分片ID,并为每个共享ID获取分片迭代器:

shard_ids = []
stream_name = None 
if response and 'StreamDescription' in response:
    stream_name = response['StreamDescription']['StreamName']                   
    for shard_id in response['StreamDescription']['Shards']:
         shard_id = shard_id['ShardId']
         shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
         shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })
Run Code Online (Sandbox Code Playgroud)

第4步:读取每个分片的数据

limit是您要接收的记录的限制.(您最多可以获得10 MB)shard_iterator是上一步的共享.

tries = 0
result = []
while tries < 100:
     tries += 1
     response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
     shard_iterator = response['NextShardIterator']
     if len(response['Records'])> 0:
          for res in response['Records']: 
               result.append(res['Data'])                  
          return result , shard_iterator
Run Code Online (Sandbox Code Playgroud)

在下次调用get_records时,您应该使用您收到的shard_iterator和前一个get_records的结果.

注意:在一次get_records调用中,(limit = None)你可以收到空记录.如果使用限制调用get_records,您将获得同一分区键中的记录(当您将数据放入流中时,您必须使用分区键:

connection.put_record(stream_name, data, partition_key)
Run Code Online (Sandbox Code Playgroud)

  • 不用说,我现在在这里找到了一个非常完整的例子:https://github.com/awslabs/kinesis-poster-worker!不过,感谢有用的答案! (2认同)

jum*_*and 7

虽然这个问题已经得到解答,但未来的读者可能会考虑使用Kinesis Client Library (KCL) for Python而不是boto直接使用.当您有多个使用者实例和/或更改分片配置时,它可以简化流的使用.

https://aws.amazon.com/blogs/aws/speak-to-kinesis-in-python/

完整地列举了KCL提供的内容

  • 连接到流
  • 枚举分片
  • 协调与其他工作者的碎片关联(如果有的话)
  • 为其管理的每个分片实例化一个记录处理器
  • 从流中提取数据记录
  • 将记录推送到相应的记录处理器
  • 检查点处理的记录 (它使用DynamoDB,因此您的代码不必手动持久保存检查点值)
  • 当工作器实例计数更改时,平衡shard-worker关联
  • 分割或合并分片时平衡分片工作者关联

粗体中的项目是我认为KCL实际上提供的非常重要的价值.但根据你的用例,boto可能会简单得多.

  • 当我受苦时,这个在哪里:( (2认同)