AWS Kinesis - 如何从上一个检查点恢复消费

Ben*_*son 7 java amazon-web-services amazon-kinesis

我正在使用 KCL (v2) 将 Kafka 消费者转换为 AWS Kinesis 消费者。在 Kafka 中,偏移量用于帮助消费者跟踪其最近使用的消息。如果我的 Kafka 应用程序死掉,它将使用偏移量从它重新启动时停止的地方开始消费。

然而,这在 Kinesis 中并不相同。我可以设置,kinesisClientLibConfiguration.withInitialPositionInStream(...)但唯一的参数是TRIM_HORIZON,LATESTAT_TIMESTAMP。如果我的 Kinesis 应用程序死掉,它在重新启动时将不知道从哪里恢复消费。

我的 KCL 消费者非常简单。该main()方法看起来像:

KinesisClientLibConfiguration config = new KinesisClientLibConfiguration("benTestApp",
            "testStream", new DefaultAWSCredentialsProviderChain(), UUID.randomUUID().toString());
config.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

Worker worker = new Worker.Builder()
            .recordProcessorFactory(new KCLRecordProcessorFactory())
            .config(config)
            .build();
Run Code Online (Sandbox Code Playgroud)

RecordProcessor是一个简单的实现:

@Override
public void initialize(InitializationInput initializationInput) {
    LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    LOGGER.info("Retrieved {} records", records.size());
    records.forEach(r -> LOGGER.info("Record: {}", StandardCharsets.UTF_8.decode(r.getData())));
}

@Override
public void shutdown(ShutdownInput shutdownInput) {
    LOGGER.info("Shutting down input");
}
Run Code Online (Sandbox Code Playgroud)

如果我检查相应的 DynamoDB 表, 的值checkpoint设置为TRIM_HORIZON,并且不会在记录被消耗时使用 sequenceIds 更新。

确保我使用每条消息的解决方案是什么?

Ben*_*son 2

正如 @kdgregory 所指出的,KCL 要求用户设置自己的检查点。工作代码:

@Override
public void initialize(InitializationInput initializationInput) {
    LOGGER.info("Initializing record processor for shard: {}", initializationInput.getShardId());
}

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    List<Record> records = processRecordsInput.getRecords();
    LOGGER.info("Retrieved {} records", records.size());
    records.forEach(r -> LOGGER.info("Record with sequenceId {} at date {} : {}", r.getSequenceNumber(),
            r.getApproximateArrivalTimestamp(), StandardCharsets.UTF_8.decode(r.getData())));
    try {
        processRecordsInput.getCheckpointer().checkpoint();
    } catch (InvalidStateException | ShutdownException e) {
        LOGGER.error("Unable to checkpoint");
    }
}

@Override
public void shutdown(ShutdownInput shutdownInput) {
    LOGGER.info("Shutting down input");
    try {
        shutdownInput.getCheckpointer().checkpoint();
    } catch (InvalidStateException | ShutdownException e) {
        LOGGER.error("Unable to checkpoint");
    }
}
Run Code Online (Sandbox Code Playgroud)