将Kafka消费者和生产者集成到一个功能中

Hir*_*ala 3 scala apache-kafka kafka-consumer-api apache-flink kafka-producer-api

我们需要开发一个代码,让消费者在运行时侦听特定的kafka生产者,然后在同一函数中产生从当前消耗的数据到另一个生产者主题的已处理数据。

这是为了将代码与Java集成在一起,使Java将代码链接到Java,在Java中Java生成到一个主题的消息,而Flink使用它并为另一个主题生成新数据,以供Java进一步处理。

请让我们知道是否还有另一种方法可以执行此过程。

Dav*_*son 5

Flink与Kafka很好地集成在一起,并且可以根据需要利用Kafka事务。这样的应用程序看起来像这样:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer011<> consumer = new FlinkKafkaConsumer011<IN>(topic_in, serializer_in, kafkaProperties);
FlinkKafkaProducer011<> producer = new FlinkKafkaProducer011<OUT>(broker, topic_out, serializer_out)

env.addSource(consumer)
   .map(new SuitableTransformation())
   .addSink(producer)
   .execute()
Run Code Online (Sandbox Code Playgroud)