解释Kinesis Shard Iterator - AWS Java SDK

Yuv*_*ger 9 java scala amazon-web-services amazon-kinesis

好的,我将从精心设计的用例开始,并解释我的问题:

  1. 我使用第三方网站分析平台,该平台利用AWS Kinesis流将数据从客户端传递到最终目的地 - 一个Kinesis流;
  2. 网站分析平台使用2个流:
    1. 数据收集器流(单个分片流);
    2. 第二个流,用于丰富来自收集器流的原始数据(单个分片流); 最重要的是,此流使用TRIM_HORIZON迭代器类型消耗第一个流中的原始数据;
  3. 我使用AWS Java SDK从流中使用数据,并使用GetShardIteratorRequest该类;
  4. 我正在开发提取类,所以这是同步完成的,这意味着我只在编译我的类时才使用数据;
  5. 该类令人惊讶地工作,虽然有一些我无法理解的东西,特别是关于如何从流中消耗数据以及每个迭代器类型的含义;

我的问题是我检索的数据不一致,并且没有按时间顺序排列的逻辑.

  • 当我使用AT_SEQUENCE_NUMBER并提供碎片中的第一个序列号时

    ..getSequenceNumberRange()getStartingSequenceNumber();

    ......作为``,我没有得到所有记录.同样地,AFTER_SEQUENCE_NUMBER;

  • 当我使用时LATEST,我的结果为零;
  • 当我使用时TRIM_HORIZON,它应该有意义使用,它似乎没有正常工作.它曾经为我提供数据,然后我添加了新的"事件"(记录到最后一个流),我收到了零记录.神秘.

我的问题是:

  1. 如何安全地使用流中的数据,而不必担心错过记录?
  2. 有替代品ShardIteratorRequest吗?
  3. 如果有,我怎么才能"浏览"流并查看其中的内容以进行调试引用?
  4. 这个TRIM_HORIZON方法我错过了什么?

在此先感谢,我真的很想从Kinesis流中学到更多关于数据消耗的知识.

Buz*_*are 6

我理解上面的困惑,我也遇到了同样的问题,但我想我现在已经弄清楚了。请注意,我在没有 KCL 的情况下直接使用JSON API

我似乎在客户端开始使用流时,API 为客户端提供了 2 个基本的迭代器选择:

A) TRIM_HORIZON:用于读取延迟在几分钟(甚至几小时)到 24 小时之前的 PAST 记录。它不会返回最近放置的记录。在此迭代器看到的最后一条记录上使用 AFTER_SEQUENCE_NUMBER 会返回一个空数组,即使记录最近已被 PUT。

B) LATEST:用于实时读取 FUTURE 记录(在 PUT 后立即读取)。我被我能在这个“在分片中的最新记录之后开始阅读,以便您始终阅读分片中的最新数据”的唯一一句话文档所欺骗。您得到的是一个空数组,因为自从获得迭代器以来没有任何记录被 PUT。如果您获得这种类型的迭代器,然后 PUT 一条记录,该记录将立即可用。

最后,如果您知道最近放置的记录的序列 ID,您可以使用 AT_SEQUENCE_NUMBER 立即获取它,并且您可以使用 AFTER_SEQUENCE_NUMBER 获取稍后的记录,即使它们不会出现在 TRIM_HORIZON 迭代器中。

上面确实意味着如果你想实时读取所有已知的过去记录和未来记录,你必须使用 A 和 B 的组合,用逻辑来处理它们之间的记录(最近的过去)。KCL 可以很好地解决这个问题。

  • AWS 无法创建一个像样的 API 来拯救自己。对于我的下一个项目,我将迁移到 Google Cloud。情况再糟糕不过了。 (2认同)