Mik*_*ike 8 junit apache-kafka apache-flink flink-streaming
我正在尝试为Flink流作业创建一个JUnit测试,该作业将数据写入kafka主题FlinkKafkaProducer09并FlinkKafkaConsumer09分别使用和从相同的kafka主题读取数据.我正在传递产品中的测试数据:
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
Run Code Online (Sandbox Code Playgroud)
并检查相同的数据是否来自消费者:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
Run Code Online (Sandbox Code Playgroud)
使用TestListResultSink.
我可以通过打印流来查看来自消费者的数据.但无法获得Junit测试结果,因为消费者即使在消息完成后也会继续运行.所以它没有来测试部分.
是以任何方式进入Flink或FlinkKafkaConsumer09停止进程或运行特定时间?
潜在的问题是流媒体程序通常不是有限的并且无限期地运行.
至少在目前,最好的方法是在流中插入一条特殊的控制消息,让源正确终止(只需通过离开读取循环停止读取更多数据).这样Flink就会告诉所有下游运营商,他们可以在消耗完所有数据后停止运营.
或者,您可以在源中引发特殊异常(例如,在一段时间之后),以便您可以区分"正确"终止与故障情况(通过检查错误原因).在源代码中抛出异常将使程序失败.
| 归档时间: |
|
| 查看次数: |
3021 次 |
| 最近记录: |