ali*_*irz 23 python stream boto amazon-web-services
我似乎无法找到一个体面的例子,展示如何通过Python使用AWS Kinesis流.有人可以请我提供一些我可以研究的例子吗?
最好
Eya*_* Ch 33
你应该使用boto.kinesis:
from boto import kinesis
Run Code Online (Sandbox Code Playgroud)
创建流后:
第1步:连接到aws kinesis:
auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)
Run Code Online (Sandbox Code Playgroud)
第2步:获取流信息(如果有多少个分片,如果它是活动的..)
tries = 0
while tries < 10:
tries += 1
time.sleep(1)
try:
response = connection.describe_stream('stream_name')
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
except :
logger.error('error while trying to describe kinesis stream : %s')
else:
raise TimeoutError('Stream is still not active, aborting...')
Run Code Online (Sandbox Code Playgroud)
第3步:获取所有分片ID,并为每个共享ID获取分片迭代器:
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
stream_name = response['StreamDescription']['StreamName']
for shard_id in response['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })
Run Code Online (Sandbox Code Playgroud)
第4步:读取每个分片的数据
limit是您要接收的记录的限制.(您最多可以获得10 MB)shard_iterator是上一步的共享.
tries = 0
result = []
while tries < 100:
tries += 1
response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
result.append(res['Data'])
return result , shard_iterator
Run Code Online (Sandbox Code Playgroud)
在下次调用get_records时,您应该使用您收到的shard_iterator和前一个get_records的结果.
注意:在一次get_records调用中,(limit = None)你可以收到空记录.如果使用限制调用get_records,您将获得同一分区键中的记录(当您将数据放入流中时,您必须使用分区键:
connection.put_record(stream_name, data, partition_key)
Run Code Online (Sandbox Code Playgroud)
虽然这个问题已经得到解答,但未来的读者可能会考虑使用Kinesis Client Library (KCL) for Python而不是boto直接使用.当您有多个使用者实例和/或更改分片配置时,它可以简化流的使用.
https://aws.amazon.com/blogs/aws/speak-to-kinesis-in-python/
更完整地列举了KCL提供的内容
粗体中的项目是我认为KCL实际上提供的非常重要的价值.但根据你的用例,boto可能会简单得多.
| 归档时间: |
|
| 查看次数: |
13485 次 |
| 最近记录: |