如何正确处理自定义MapFunction中的错误?

Max*_*kov 8 apache-flink

我已经实现MapFunction了我的Apache Flink流程.它正在解析传入的元素并将它们转换为其他格式,但有时会出现错误(即传入的数据无效).

我看到了两种可能的处理方式:

  • 忽略无效元素,但似乎我不能忽略错误,因为对于任何传入元素我必须提供传出元素.
  • 将传入的元素拆分为有效和无效但似乎我应该使用其他功能.

所以,我有两个问题:

  1. 如何正确处理我的错误MapFunction
  2. 如何正确实现这样的转换功能?

Til*_*ann 7

你可以使用a FlatMapFunction而不是a MapFunction.这将允许您仅在元素有效时才发出元素.以下显示了一个示例实现:

input.flatMap(new FlatMapFunction<String, Long>() {
    @Override
    public void flatMap(String input, Collector<Long> collector) throws Exception {
        try {
            Long value = Long.parseLong(input);
            collector.collect(value);
        } catch (NumberFormatException e) {
            // ignore invalid data
        }
    }
});
Run Code Online (Sandbox Code Playgroud)

  • 您可以引入一个包装类型,它可以包含有效和无效的值.然后你可以使用`split` +`select`函数将流分成故障流和正确的值流,你可以写入不同的接收器. (4认同)