Ans*_*jan 4 python amazon-web-services amazon-kinesis boto3
from __future__ import print_function # Python 2/3 compatibility
import boto3
import json
import decimal
#kinesis = boto3.resource('kinesis', region_name='eu-west-1')
client = boto3.client('kinesis')
with open("questions.json") as json_file:
questions = json.load(json_file)
Records = []
count = 0
for question in questions:
value1 = question['value']
if value1 is None:
value1 = '0'
record = { 'StreamName':'LoadtestKinesis', 'Data':b'question','PartitionKey':'value1' }
Records.append(record)
count +=1
if count == 500:
response = client.put_records(Records)
Records = []
Run Code Online (Sandbox Code Playgroud)
这是我的python脚本,用于将json文件数组加载到kinesis流中,在这里我将合并500条记录以使用put_recordsfunction。但我收到一个错误:put_records() only accepts keyword arguments。如何将记录列表传递给此方法?每个记录都是带有分区键的json。
样本Json:
[{
"air_date": "2004-12-31",
"answer": "FDDDe",
"category": "AACC",
"question": "'No. 2: 1912 Olympian; football star at Carlisle Indian School; 6 MLB seasons with the Reds, Giants & Braves'",
"round": "DDSSS!",
"show_number": "233",
"value": "$200"
}]
Run Code Online (Sandbox Code Playgroud)
from __future__ import print_function # Python 2/3 compatibility
import boto3
import json
import decimal
import time
def putdatatokinesis(RecordKinesis):
start = time.clock()
response = client.put_records(Records=RecordKinesis, StreamName='LoadtestKinesis')
print ("Time taken to process" + len(Records) + " is " +time.clock() - start)
return response
client = boto3.client('kinesis')
firehoseclient = boto3.client('firehose')
with open("questions.json") as json_file:
questions = json.load(json_file)
Records = []
RecordKinesis = []
count = 0
for question in questions:
value1 = question['value']
if value1 is None:
value1 = '0'
recordkinesis = { 'Data':b'question','PartitionKey':value1 }
RecordKinesis.append(recordkinesis)
Records.append(record)
count +=1
if count == 500:
putdatatokinesis(RecordKinesis)
Records = []
RecordKinesis = []
Run Code Online (Sandbox Code Playgroud)
这样做的目的是将参数Records作为键参数传递。
当传递多条记录时,需要将记录封装在记录列表中,然后添加流标识符。
格式是这样的:
{
"Records": [
{
"Data": blob,
"ExplicitHashKey": "string",
"PartitionKey": "string"
},
{
"Data": "another record",
"ExplicitHashKey": "string",
"PartitionKey": "string"
}
],
"StreamName": "string"
}
Run Code Online (Sandbox Code Playgroud)
有关更多信息,请参阅Kinesis 文档。
| 归档时间: |
|
| 查看次数: |
6802 次 |
| 最近记录: |