我刚开始使用 Kinesis,其 API 可在此处获得
我已经用它将100 条记录推送到kinesis
for (int j = 0; j < 100; j++) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
putRecordRequest.setData(ByteBuffer.wrap(data.getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", j));
PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
+ ", ShardID : " + putRecordResult.getShardId() + ", Sequence No : "+ putRecordResult.getSequenceNumber());
}
Run Code Online (Sandbox Code Playgroud)
现在我想获取被推送的记录数。为此,我正在使用它:
Iterator<Shard> shardIterator = getTotalShardsIterator();//Implemented and giving perfectly all the shards.....
Run Code Online (Sandbox Code Playgroud)
现在使用上面的迭代器,我得到的计数为:
.....
while (shardIterator.hasNext()) {
Shard shard = shardIterator.next();
String shardId = …Run Code Online (Sandbox Code Playgroud)