阅读并处理来自Kafka的一批消息

bm1*_*729 5 java apache-kafka apache-kafka-streams apache-kafka-connect

我想从kafka主题中定期读取一批消息,或者当读取的消息数达到一定数量时,将它们作为批处理发送到下游系统.目前,我的kafka拓扑结构由处理器终止,处理器保存消息,然后使用punctuate方法逐步处理批处理.

但是,我不确定这是完美的,因为如果应用程序在调用punctuate方法之前崩溃,我认为一些消息会丢失(即消费者认为它已经完成了它们但它们不会出现在下游系统中) .

batchQueue = new LinkedBlockingQueue<String>(batchSize);

KStream<String, String> inputStream = builder
    .stream(Serdes.String(), Serdes.String(), "source-topic")
    .process(new ProcessorSupplier<String, String>() {

            @Override
            public Processor<String, String> get() {
                return new AbstractProcessor<String, Wrapper>() {

                    @Override
                    public void init(ProcessorContext context) {
                        super.init(context);
                        context.schedule(flushPeriod);
                    }

                    @Override
                    public void process(String key, String value) {
                        batchQueue.add(value);
                        if (batchQueue.remainingCapacity() == 0) {
                            processQueue();
                        }
                    }

                    @Override
                    public void punctuate(long timestamp) {
                        processQueue();
                        context().commit();
                    }
                }

                @Override
                public void close() {}
            };
        }
    });
Run Code Online (Sandbox Code Playgroud)

Mic*_*oll 4

有没有办法让这种方法更加稳健?也许是窗口化,但我不太明白这一点。

我建议将数据转换部分(我将使用 Kafka 的 Streams API)和写入下游系统的数据摄取部分(我将使用 Kafka 的 Connect API)分离。

简而言之,为什么您的转换逻辑应该与该数据最终转发到的下游系统之一的具体细节(此处:昂贵的插入!)相结​​合?理想情况下,转型的责任应该只是转型,而不应该涉及外部下游系统的运营方面。例如,如果您最终想要将转换后的数据转发到第二个下游系统(或第三个,...),那么耦合方法将意味着您必须更新/重新部署/...您的应用程序,甚至尽管它的转换逻辑没有改变。

解耦转换和摄取的另一个好处是,您的转换逻辑将更加简单,因为它不必考虑由于下游系统缓慢、不可用等而导致的失败。例如,它不需要实现/测试复杂的重试逻辑。

我必须使用 Kafka connect 为此吗?

不,您不需要为此使用 Kafka Connect,但它可以说是完成此任务的最佳工具。

我倾向于远离[Kafka Connect],因为它的错误处理能力:https://groups.google.com/forum/#!topic/confluence- platform/OBuLbVHbuyI

在最新版本的 Kafka Connect 中,错误处理实际上相当不错。此外,通过提供更强大的转换器(例如:串行器/解串器)供 Connect 使用,可以轻松解决链接讨论中的问题 IIRC。

此外,正如该链接中提到的,当您在将数据写入 Kafka 之前验证数据的兼容性时,那里讨论的具体问题就变得不那么重要了。您可以通过利用 Confluence 的模式注册表(https://github.com/confluenceinc/schema-registry文档或类似工具)来实现这一点。既然您提出了“如何使其更加健壮”的问题,请考虑数据序列化和演化这是我在部署到生产之前要考虑的另一个重要方面。

希望这可以帮助!