Kinesis:关闭工人的最佳/安全方法是什么?

Kal*_*nam 5 java multithreading amazon-web-services amazon-kinesis aws-sdk

我正在使用AWS Kinesis客户端库.

我需要一种方法在部署期间关闭Kinesis Worker线程,这样我就停在检查点而不是在中间processRecords().

我看到一个关闭布尔存在,Worker.java但它是私有的.

我需要的原因是检查点和幂等性对我来说至关重要,我不想在批处理中间杀死进程.

[编辑]

感谢@CaptainMurphy,我注意到Worker.java曝光shutdown()方法可以安全地关闭工人和工人LeaseCoordinator.它没有做的是调用shutdown()任务IRecordProcessor.它突然终止而IRecordProcessor不用担心国家.

我确实理解KCL不保证检查点之间的幂等性,开发人员应该使设计容错,但我觉得IRecordProcessor应该在LeaseCoordinator停止之前正确关闭,无论如何.

Chr*_*ell 2

当您在 Worker 上调用 shutdown 时,记录处理器关闭方法确实会被调用。可以从ShutdownTask类追溯到,该类是由ShardConsumer类创建的,而该类是由Worker关闭的。

因此,您可以通过侦听关闭调用来在最后接收的记录处设置检查点,将检查点传递给最后一个处理值的迭代点处的最后一个序列接收函数。例如在您重写的 processRecords() 中:

for(Record currRecord : records)
{
    someProcessSingleRecordMethod(currRecord)
    if(shutdown) 
    { 
        checkpointer.checkpoint(currRecord.getSequenceNumber()); 
        return; 
    } 
}
Run Code Online (Sandbox Code Playgroud)

您的关闭方法将关闭标志设置为 true。

请注意,在非正常关闭(例如实例终止)的情况下,以“至少一次”方式设计 Kinesis 应用程序仍然是最佳实践。对于 Kinesis 来说,“仅接收和处理一次”可能不是一个好的用例。