小编Mik*_*ike的帖子

如何从程序中停止flink流式传输作业

我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题FlinkKafkaProducer09FlinkKafkaConsumer09分别使用和从相同的kafka主题读取数据.我正在传递产品中的测试数据:

DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
Run Code Online (Sandbox Code Playgroud)

并检查相同的数据是否来自消费者:

List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result =  resultSink.getResult();
assertEquals(expected, result);
Run Code Online (Sandbox Code Playgroud)

使用TestListResultSink.

我可以通过打印流来查看来自消费者的数据.但无法获得Junit测试结果,因为消费者即使在消息完成后也会继续运行.所以它没有来测试部分.

是以任何方式进入FlinkFlinkKafkaConsumer09停止进程或运行特定时间?

junit apache-kafka apache-flink flink-streaming

8
推荐指数
1
解决办法
3021
查看次数