在适用于 AWS Kinesis 的 KCL Java 库的情况下,如何使用 requestShutdown 和 shutdown 来正常关闭

dav*_*ing 3 java aws-java-sdk amazon-kcl amazon-kinesis-firehose

我正在尝试使用 Java 中 KCL 库的新功能来实现 AWS Kinesis 的正常关闭,方法是注册关闭钩子以停止所有记录处理器,然后正常停止工作进程。新库提供了记录处理器需要实现的新接口。但它是如何被调用的呢?

尝试先调用worker.requestShutdown(),然后调用worker.shutdown(),它起作用了。但这是使用它的任何预期方式吗?那么两者都有什么用,有什么好处呢?

pra*_*upd 5

启动消费者

您可能知道,当您创建 时Worker,它

1)在dynamodb中创建消费者偏移表

2)创建租约,按配置的时间间隔安排租约接受者和租约更新者

如果您有两个分区,那么同一个 dynamodb 表中将有两条记录,这意味着分区需要租约。

例如。

{
  "checkpoint": "TRIM_HORIZON",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 38,
  "leaseKey": "shardId-000000000000",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}

{
  "checkpoint": "49570828493343584144205257440727957974505808096533676050",
  "checkpointSubSequenceNumber": 0,
  "leaseCounter": 40,
  "leaseKey": "shardId-000000000001",
  "leaseOwner": "ComponentTest_Consumer_With_Two_Partitions_Consumer_192.168.1.83",
  "ownerSwitchesSinceCheckpoint": 0
}
Run Code Online (Sandbox Code Playgroud)
  • leaseCoordinatorThreadPool获取和更新租约的时间表由租约协调员 ScheduledExecutorService (称为)负责

3)然后,对于流中的每个分区,Worker创建一个内部PartitionConsumer,它实际上获取事件,并分派到您的RecordProcessor#processRecords. 请参阅ProcessTask#call

4)关于你的问题,你必须将你的IRecordProcessorFactoryimpl 注册到worker,这将为ProcessorFactoryImpl每个提供一个PartitionConsumer

例如。请参阅此处的示例,这可能会有所帮助

KinesisClientLibConfiguration streamConfig = new KinesisClientLibConfiguration(
 "consumerName", "streamName", getAuthProfileCredentials(), "consumerName-" + "consumerInstanceId")
            .withKinesisClientConfig(getHttpConfiguration())
            .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); // "TRIM_HORIZON" = from the tip of the stream

Worker consumerWorker = new Worker.Builder()
            .recordProcessorFactory(new DavidsEventProcessorFactory())
            .config(streamConfig)
            .dynamoDBClient(new DynamoDB(new AmazonDynamoDBClient(getAuthProfileCredentials(), getHttpConfiguration())))
            .build();


public class DavidsEventProcessorFactory implements IRecordProcessorFactory {

    private Logger logger = LogManager.getLogger(DavidsEventProcessorFactory.class);

    @Override
    public IRecordProcessor createProcessor() {
        logger.info("Creating an EventProcessor.");
        return new DavidsEventPartitionProcessor();
    }
}

class DavidsEventPartitionProcessor implements IRecordProcessor {

    private Logger logger = LogManager.getLogger(DavidsEventPartitionProcessor.class);

    //TODO add consumername ?

    private String partitionId;

    private ShutdownReason RE_PARTITIONING = ShutdownReason.TERMINATE;

    public KinesisEventPartitionProcessor() {
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.partitionId = initializationInput.getShardId();
        logger.info("Initialised partition {} for streaming.", partitionId);
    }

    @Override
    public void processRecords(ProcessRecordsInput recordsInput) {
        recordsInput.getRecords().forEach(nativeEvent -> {
            String eventPayload = new String(nativeEvent.getData().array());
            logger.info("Processing an event {} : {}" , nativeEvent.getSequenceNumber(), eventPayload);

            //update offset after configured amount of retries
            try {
                recordsInput.getCheckpointer().checkpoint();
                logger.debug("Persisted the consumer offset to {} for partition {}",
                        nativeEvent.getSequenceNumber(), partitionId);
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        });
    }

    @Override
    public void shutdown(ShutdownInput shutdownReason) {
        logger.debug("Shutting down event processor for {}", partitionId);

        if(shutdownReason.getShutdownReason() == RE_PARTITIONING) {
            try {
                shutdownReason.getCheckpointer().checkpoint();
            } catch (InvalidStateException e) {
                logger.error("Cannot update consumer offset to the DynamoDB table.", e);
                e.printStackTrace();
            } catch (ShutdownException e) {
                logger.error("Consumer Shutting down", e);
                e.printStackTrace();
            }
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

// 然后启动一个消费者

consumerWorker.run();
Run Code Online (Sandbox Code Playgroud)

停止消费者

现在,当您想要停止 Consumer 实例(Worker)时,您不需要对每个实例进行太多处理PartitionConsumer,一旦Worker您要求它关闭,它们就会被处理。

  • 对于shutdown,它要求停止leaseCoordinatorThreadPool,它负责更新和获取租约,并等待终止。

  • requestShutdown另一方面取消租约接受者, 通知PartitionConsumers 有关关闭的信息。

更重要的requestShutdown是,如果您想收到有关您的通知,那么您也RecordProcessor可以实施。IShutdownNotificationAware这样,当您RecordProcessor正在处理事件但工作人员即将关闭时,如果出现竞争条件,您仍然应该能够提交偏移量然后关闭。

requestShutdown返回 a ShutdownFuture,然后回调worker.shutdown

您必须在您的设备上实施以下方法才能RecordProcessor收到通知requestShutdown

class DavidsEventPartitionProcessor implements IRecordProcessor, IShutdownNotificationAware {

   private String partitionId;

   // few implementations

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        logger.debug("Shutdown requested for {}", partitionId);
    }

}
Run Code Online (Sandbox Code Playgroud)

但是,如果您在通知之前放弃租约,则可能不会被收回。

您的问题摘要

新库提供了记录处理器需要实现的新接口。但它是如何被调用的呢?

  • 实施 aIRecordProcessorFactoryIRecordProcessor.
  • 然后将你的连接RecordProcessorFactory到你的Worker.

尝试先调用worker.requestShutdown(),然后调用worker.shutdown(),它起作用了。但它有任何预期的使用方式吗?

您应该使用requestShutdown()优雅关闭,这将处理竞争条件。它是在kinesis-client-1.7.1中引入的