Kinesis Shard GetRecords.IteratorAgeMilliseconds 达到最大值 86.4M(1 天)并且即使消耗也不会减少

Gre*_*ret 5 apache-spark amazon-kinesis spark-streaming amazon-kcl

  • 我正在使用 Spark Streaming 2.2.0 和spark-streaming-kinesis-asl_2.11使用 Kinesis 流。
  • Kinesis Stream 有 150 个分片,我正在监控GetRecords.IteratorAgeMillisecondsCloudWatch 指标以查看消费者是否跟上流。
  • Kinesis Stream 的默认数据保留时间为 86400 秒(1 天)。
  • 我正在调试一个案例,其中几个 Kinesis Shards 达到最大值GetRecords.IteratorAgeMilliseconds86400000(== 保留期)
  • 这仅适用于某些分片(我们称它们为过时分片),而不是所有分

我已经确定了过时分片的shardIds 。其中之一是shardId-000000000518,我可以在 DynamoDB 表中看到包含以下检查点信息:

leaseKey: shardId-000000000518
checkpoint: 49578988488125109498392734939028905131283484648820187234
checkpointSubSequenceNumber: 0
leaseCounter: 11058
leaseOwner: 10.0.165.44:52af1b14-3ed0-4b04-90b1-94e4d178ed6e    
ownerSwitchesSinceCheckpoint: 37
parentShardId: { "shardId-000000000269" }
Run Code Online (Sandbox Code Playgroud)

我可以在 10.0.165.44 上的工人日志中看到以下内容:

17/11/22 01:04:14 INFO Worker:当前流分片分配:shardId-000000000339, ..., shardId-000000000280, shardId-0000000000518

...这应该意味着shardId-000000000518被分配给这个工人。但是,我从未在此 shardId 的日志中看到任何其他内容。如果工作人员没有从这个 shardId 消费(但它应该),这可以解释为什么GetRecords.IteratorAgeMilliseconds永远不会减少。对于其他一些(非过时的 shardIds),我可以在日志中看到

17/11/22 01:31:28 INFO SequenceNumberValidator:已验证序列号 49578988151227751784190049362310810844771023726275728690 分片 ID 00300303003

我确实通过查看 IncomingRecords CloudWatch 指标来验证过时的分片是否有数据流入其中。

我该如何调试/解决这个问题?为什么这些 shardId 永远不会被 Spark 工作人员接收到?