AWS Kinesis NextShardIterator 永远不会为空

Tuğ*_*rli 3 amazon-web-services amazon-kinesis .net-core

上下文:我正在尝试使用 API 引用从 Kinesis 流中获取记录。我正在使用.Net Core(3.1版本)。

我正在使用 API 将数据写入 Kinesis Stream。这个问题没有任何问题。但我在读取数据时遇到一些问题。我将 getRecord 方法放在 do-while 循环中。while 的条件是 nextShardIterator 值是否为 null?但这个值永远不会变空。我无法打破循环。

一些答案包括这样的短语:“NextShardIterator 分片中开始顺序读取数据记录的下一个位置。如果设置为 null,则分片已关闭,并且请求的迭代器不再返回任何数据。”

我只有 1 个流和 1 个分片。我放了2条记录。之后,我执行读取方法。它正在获取这些记录。但此后,即使记录被消耗,nextShardIterator也永远不会为空。

  • 列出流(我只有 1 个流)
  • 描述流并获取分片(我只有 1 个分片)
  • 获取 TRIM_HORIZON 类型的 ShardIterator
  • 循环中的 GetRecord (while{if nextShardIterator != null})
  • 找到记录后,我尝试使用 AFTER_SEQUENCE 分片迭代器
  • 如果它无法获取任何记录,我正在尝试使用最新的分片迭代器

putRecord代码是这样的:

public async Task<ResponseModel> PutRecord(string orderFlow, string documentId)
        {
            byte[] bytedata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderFlow));
            using MemoryStream memoryStream = new MemoryStream(bytedata);
            var putRecordRequest = new PutRecordRequest();
            putRecordRequest.StreamName = myStreamName;
            putRecordRequest.PartitionKey = "partition" + documentId;
            putRecordRequest.Data = memoryStream;
            try
            {
                var putRecordResponse = await kinesisClient.PutRecordAsync(putRecordRequest);
                return new ResponseModel
                {
                    Data = putRecordResponse,
                    Status = ResponseStatus.Success,
                    Message = "Successfully put record!"
                };
            }
            catch (Exception e)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Error,
                    Message = "PutRecord Error: " + e.Message
                };
            }
        }
Run Code Online (Sandbox Code Playgroud)

但我在读取数据时遇到一些问题。GetRecord 代码是这样的:

public async Task<ResponseModel> GetRecords()
        {
            var listStreams = await ListStreams();
            if (listStreams.StreamNames.Count == 0)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Do not have any Stream!"
                };
            }
            myStreamName = listStreams.StreamNames[0];

            var describeStreams = await DescribeStream(listStreams.StreamNames[0]);
            if (describeStreams.StreamDescription.StreamStatus != "ACTIVE")
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Stream status: " + describeStreams.StreamDescription.StreamStatus
                };
            }

            var shards = describeStreams.StreamDescription.Shards;
            if (shards.Count == 0)
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "Do not have any Shard (or data)!"
                };
            }
            var shardId = shards[0].ShardId;

            var shardIterator = await GetShardIterator(shardId);
            if (string.IsNullOrWhiteSpace(shardIterator.ShardIterator))
            {
                return new ResponseModel
                {
                    Data = null,
                    Status = ResponseStatus.Warning,
                    Message = "ShardIterator is null or empty!"
                };
            }

            var getRecords = await GetRecords(shardIterator.ShardIterator);
            Console.WriteLine("First Iterator: " + shardIterator);
            var dataList = new List<Record>();

            do
            {
                if (getRecords.Records.Count == 0)
                {
                    Console.WriteLine("Records are empty!");
                    var nextShardIterator = GetShardLatest(shardId).Result.ShardIterator;
                    getRecords = await GetRecords(nextShardIterator);
                    Console.WriteLine("Latest Iterator: " + nextShardIterator);
                }
                else
                {
                    Console.WriteLine("We have records!");
                    foreach (var record in getRecords.Records)
                    {
                        dataList.Add(record);
                    }
                    var nextShardIterator = GetShardIteratorWithSequence(shardId, getRecords.Records[getRecords.Records.Count-1].SequenceNumber).Result.ShardIterator;
                    Console.WriteLine("AfterSequence Iterator: " + nextShardIterator);
                    getRecords = await GetRecords(nextShardIterator);
                }
            } while (getRecords.NextShardIterator != null);

            return new ResponseModel
            {
                Data = dataList,
                Status = ResponseStatus.Success,
                Message = "Successfull"
            };
        }
Run Code Online (Sandbox Code Playgroud)

Par*_*fal 5

Kinesis 流是一个潜在的无限记录序列,可以随时由多个生产者添加。因此,打开流的分片迭代器永远不会为空。

如果您想在到达流的“末尾”时跳出循环,请查看GetRecordsMillisBehindLatest响应中的字段。引用文档:

值为零表示记录处理已完成,此时没有新记录需要处理。

但请注意,可能随时添加新记录。如果您确实跳出了循环,请务必保存SequenceNumber从您处理的最后一条记录返回的结果,以便您可以从上次中断的地方继续。