Nod*_*.JS 8 c# amazon-web-services amazon-kinesis .net-core
我正在使用AWS Kinesis与生产者和消费者进行试验,但问题是,尽管我们多次更改了发送的数据对象,但消费者仍继续收到我们产生的第一条消息(或记录)。此外,我们尝试了多个ShardIteratorType,但没有一个起作用。最新不会产生任何结果,所有其他都会产生相同的原始记录。
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Amazon;
using Amazon.Internal;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using BenchmarkRuleSetModel.Models;
using MongoDB.Driver;
using Newtonsoft.Json;
namespace ConsoleApp7
{
internal class Program
{
private static AmazonKinesisClient _client;
private static string _streamName;
static async Task ReadFromStream()
{
var kinesisStreamName = _streamName;
var describeRequest = new DescribeStreamRequest
{
StreamName = kinesisStreamName,
};
var describeResponse = await _client.DescribeStreamAsync(describeRequest);
var shards = describeResponse.StreamDescription.Shards;
foreach (var shard in shards)
{
var iteratorRequest = new GetShardIteratorRequest
{
StreamName = kinesisStreamName,
ShardId = shard.ShardId,
ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue
};
var iteratorResponse = await _client.GetShardIteratorAsync(iteratorRequest);
var iteratorId = iteratorResponse.ShardIterator;
while (!string.IsNullOrEmpty(iteratorId))
{
var getRequest = new GetRecordsRequest
{
ShardIterator = iteratorId, Limit = 10000
};
var getResponse = await _client.GetRecordsAsync(getRequest);
var nextIterator = getResponse.NextShardIterator;
var records = getResponse.Records;
if (records.Count > 0)
{
Console.WriteLine("Received {0} records. ", records.Count);
foreach (var record in records)
{
var json = Encoding.UTF8.GetString(record.Data.ToArray());
Console.WriteLine("Json string: " + json);
}
}
iteratorId = nextIterator;
}
}
}
private static async Task<string> Produce()
{
var data = new
{
Message = "Hello world!",
Author = "Amir"
};
//convert to byte array in prep for adding to stream
var oByte = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));
using (var ms = new MemoryStream(oByte))
{
//create put request
var requestRecord = new PutRecordRequest
{
StreamName = _streamName,
PartitionKey = Guid.NewGuid().ToString(),
Data = ms
};
//list name of Kinesis stream
//give partition key that is used to place record in particular shard
//add record as memorystream
//PUT the record to Kinesis
var response = await _client.PutRecordAsync(requestRecord);
return response.SequenceNumber;
}
}
static void Main(string[] args)
{
_client = new AmazonKinesisClient("ExampleKey", "ExampleSecret", RegionEndpoint.EUWest2);
_streamName = "SomeStream";
Produce().Wait();
ReadFromStream().Wait();
}
}
}
Run Code Online (Sandbox Code Playgroud)
首先,当我调试了您的代码时,我注意到它在内循环 ( while (!string.IsNullOrEmpty(iteratorId))) 中无限循环,并且永远不会循环流中的所有分片(假设您有 >1)。原因在https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty中进行了解释- 因为生产者从未调用过MergeShards或SplitShards,它们保持打开状态,因此NextShardIterator永远不会NULL。
这就是为什么您只看到记录放在第一个分片上(或者至少我在运行代码时看到) - 您必须并行地从分片中读取。
就您的使用模式而言,您正在使用:
ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue
Run Code Online (Sandbox Code Playgroud)
通过这种方式,您实际上是在告诉 Kinesis“从一开始就向我提供流中的所有记录”(或至少直到保留期为止)。这就是为什么除了新记录之外,您还不断看到相同的旧记录(这也是我运行代码时看到的)。
调用GetRecords[Async]实际上不会从流中删除记录(请参阅/sf/answers/1801891311/)。使用 Kinesis 的正确方法是逐个检查点移动。如果消费者要保留SequenceNumber最后读取的记录,然后重新启动,如下所示:
ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER,
StartingSequenceNumber = lastSeenSequenceNumber
Run Code Online (Sandbox Code Playgroud)
然后您只会看到较新的记录。
| 归档时间: |
|
| 查看次数: |
253 次 |
| 最近记录: |