apache flink 0.10如何从无界输入dataStream中获取复合键的第一次出现?

tim*_*ler 6 apache-flink flink-streaming

我是apache flink的新手.我的输入中有一个未绑定的数据流(通过kakfa送入flink 0.10).

我想获得每个主键的第一次出现(主键是contract_num和event_dt).
这些"重复"几乎在彼此之后立即发生.源系统不能为我过滤这个,所以flink必须这样做.

这是我的输入数据:
contract_num,event_dt,attr
A1,2016-02-24 10:25: 08,X
A1,2016-02-24
10:25: 08,Y A1,2016-02-24 10:25: 09,Z
A2,2016-02-24 10:25:10,C

这是我想要的输出数据:
A1,2016-02-24 10 :25: 08,X A1,2016-02-24 10 :25:
09,Z A2,2016-02-24 10 :25:10
,C

请注意第2行已被删除,因为A001和'2016-02-24 10:25:08'的组合键已在第1行中出现.

我怎么能用flink 0.10做到这一点?

我正在考虑使用keyBy(0,1),
但之后我不知道该怎么做!

(我使用joda-time和org.flinkspector来设置这些测试).

contract_num, event_dt, attr 
A1, 2016-02-24 10:25:08, X
A1, 2016-02-24 10:25:08, Y
A1, 2016-02-24 10:25:09, Z
A2, 2016-02-24 10:25:10, C
Run Code Online (Sandbox Code Playgroud)

Til*_*ann 10

如果密钥空间大于可用存储空间,则在无限流上过滤重复项最终将失败.原因是您必须将已经看到的键存储在某处以过滤掉重复项.因此,最好定义一个时间窗口,之后您可以清除当前看到的键组.

如果你知道这个问题但想要尝试它,你可以通过flatMapkeyBy通话后应用有状态操作来实现.有状态映射器使用Flink的状态抽象来存储它是否已经看到具有此键的元素.这样,您也将受益于Flink的容错机制,因为您的状态将自动检查点.

一个完成你工作的Flink程序可能看起来像

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple3<String, Date, String>> input = env.fromElements(Tuple3.of("foo", new Date(1000), "bar"), Tuple3.of("foo", new Date(1000), "foobar"));

    input.keyBy(0, 1).flatMap(new DuplicateFilter()).print();

    env.execute("Test");
}
Run Code Online (Sandbox Code Playgroud)

执行的地方DuplicateFilter取决于Flink的版本.

版本> = 1.0实现

public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {

    static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
    private ValueState<Boolean> operatorState;

    @Override
    public void open(Configuration configuration) {
        operatorState = this.getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
        if (!operatorState.value()) {
            // we haven't seen the element yet
            out.collect(value);
            // set operator state to true so that we don't emit elements with this key again
            operatorState.update(true);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

版本0.10实施

public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {

    private OperatorState<Boolean> operatorState;

    @Override
    public void open(Configuration configuration) {
        operatorState = this.getRuntimeContext().getKeyValueState("seen", Boolean.class, false);
    }

    @Override
    public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
        if (!operatorState.value()) {
            // we haven't seen the element yet
            out.collect(value);
            operatorState.update(true);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

更新:使用翻滚时间窗口

input.keyBy(0, 1).timeWindow(Time.seconds(1)).apply(new WindowFunction<Iterable<Tuple3<String,Date,String>>, Tuple3<String, Date, String>, Tuple, TimeWindow>() {
    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Date, String>> input, Collector<Tuple3<String, Date, String>> out) throws Exception {
        out.collect(input.iterator().next());
    }
})
Run Code Online (Sandbox Code Playgroud)