如何在其他的基础上过滤Apache flink流?

Anu*_*ain 2 apache-flink flink-streaming

我有两个流,一个是 Int ,另一个是 json 。在 json Schema 中有一个键是一些 int 。所以我需要通过与另一个整数流的键比较来过滤 json 流,所以它可能在 Flink 中吗?

Dav*_*son 6

是的,你可以用 Flink 做这种流处理。您需要从 Flink 获得的基本构建块是连接流和有状态函数——这是一个使用 RichCoFlatMap 的示例:

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

public class Connect {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> control = env.fromElements(
                new Event(17),
                new Event(42))
                .keyBy("key");

        DataStream<Event> data = env.fromElements(
                new Event(2),
                new Event(42),
                new Event(6),
                new Event(17),
                new Event(8),
                new Event(42)
                )
                .keyBy("key");

        DataStream<Event> result = control
                .connect(data)
                .flatMap(new MyConnectedStreams());

        result.print();

        env.execute();
    }

    static final class MyConnectedStreams
            extends RichCoFlatMapFunction<Event, Event, Event> {

        private ValueState<Boolean> seen = null;

        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
                    // state name
                    "have-seen-key",
                    // type information of state
                    TypeInformation.of(new TypeHint<Boolean>() {
                    }));
            seen = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap1(Event control, Collector<Event> out) throws Exception {
            seen.update(Boolean.TRUE);
        }

        @Override
        public void flatMap2(Event data, Collector<Event> out) throws Exception {
            if (seen.value() == Boolean.TRUE) {
                out.collect(data);
            }
        }
    }


    public static final class Event {
        public Event() {
        }

        public Event(int key) {
            this.key = key;
        }

        public int key;

        public String toString() {
            return String.valueOf(key);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

在这个例子中,只有那些在控制流上看到的键才会通过数据流——所有其他事件都被过滤掉。我利用了Flink 的托管键控状态连接流

为了简单起见,我忽略了您对数据流具有 JSON 的要求,但您可以在其他地方找到有关如何使用 JSON 和 Flink 的示例。

请注意,您的结果将是不确定的,因为您无法控制两个流相对于彼此的时间。您可以通过向流添加事件时间时间戳来管理此问题,然后改用 RichCoProcessFunction。