Camel 应用程序重新处理 Kinesis 记录

Jul*_*tte 6 java apache-camel amazon-kinesis kubernetes

问题:

我有一个在 Kubernetes 中运行的 Java 应用程序的三个实例。我的应用程序使用 Apache Camel 从 Kinesis 流中读取数据。我目前正在观察两个相关问题:

  1. 我的应用程序的三个正在运行的实例中的每一个都在处理进入流的记录,当我只希望每个记录被处理一次(我希望三个启动并运行以进行扩展)。我希望当一个实例正在处理记录 A 时,第二个实例可能正在获取记录 B,等等。

  2. 每次我的应用程序在 Kubernetes 中重新部署时,每个实例都会重新启动每条记录(换句话说,它不知道它在哪里停止或以前处理过哪些记录)。

  3. 5 分钟后,我的应用程序用于轮询 kinesis 的分片迭代器超时。我知道这是正常行为,但我不明白为什么我的应用程序没有获取新的迭代器。此屏幕截图显示了来自 DataDog 的错误。来自 DataDog 的错误

我的尝试: 首先,我认为这个问题是由我的应用程序的三个实例和部署中的分片迭代器 id 和 kinesis 消费者 id 不一致引起的。但是,我一直无法找到这些值在代码中的设置位置,以及如何设置它们。当然,也可能有更好的解决方案。我发现关于 Kinesis/Kubernetes/Camel 协同工作的文档很少,因此很少有外部资源有帮助。

AWS Kinesis :: Apache Camel上的文档非常有限,但我尝试使用迭代器类型并构建自定义客户端配置

如果您需要任何其他信息,请告诉我,谢谢。

配置客户端:

main.bind("kinesisClient", AmazonKinesisClientBuilder.defaultClient());
        .
        .
        .
    
inputUri = String.format("aws-kinesis://%s?amazonKinesisClient=#kinesisClient", rawKinesisName);
    
main.configure().addRoutesBuilder(new RawDataRoute(inputUri, inputTransform));
Run Code Online (Sandbox Code Playgroud)

我的路线:

public class RawDataRoute extends RouteBuilder {
    private static final Logger LOG = new Logger(RawDataRoute.class, true);
    private String rawDataStreamUri;
    private Expression transform;

    public RawDataRoute(final String rawDataStreamUri, final Expression transform) {
        this.rawDataStreamUri = rawDataStreamUri;
        this.transform = transform;
    }

    @Override
    public void configure() {
        // TODO add error handling
        from(rawDataStreamUri)
            .routeId("raw_data_stream")
            .transform(transform)
            .to("direct:main_input_stream");
    }

}
Run Code Online (Sandbox Code Playgroud)