多个 Apache Flink 窗口验证

Jes*_*iga 5 java apache-flink flink-streaming

我刚刚开始使用 Apache Flink 进行流处理,问题是我收到了一个看起来像这样的 Json 流:

{

  token_id: “tok_afgtryuo”,

  ip_address: “128.123.45.1“,

  device_fingerprint: “abcghift”,

  card_hash: “hgtyuigash”,

  “bin_number”: “424242”,

  “last4”: “4242”,

  “name”: “Seu Jorge”

}
Run Code Online (Sandbox Code Playgroud)

并被问及我是否可以满足以下业务规则:

  • 如果过去 10 秒内此 IP 的令牌数量 > 5,则拒绝

  • 如果最后一分钟此 IP 的令牌数量 > 15,则拒绝

  • 如果过去一小时此 IP 的令牌数量 > 60,则拒绝

我创建了 2 个类,main当我创建一个实例来调用Window具有不同参数的函数以避免重复代码时,类:

主程序

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //This DataStream Would be  Converting the Json to a Token Object
        DataStream<Token> baseStream =
                env.addSource(new SocketTextStreamFunction("localhost",
                        9999,
                        "\n",
                        1))
                        .map(new MapTokens());


        // 1- First rule Decline if number of tokens > 5 for this IP in last 10 seconds
       DataStreamSink<String> response1 =  new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.seconds(10),
               5, "seconds").print();

        //2 -Decline if number of tokens > 15 for this IP in last minute
        DataStreamSink<String> response2 = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.minutes(1),
                62, "minutes").print();

        //3- Decline if number of tokens > 60 for this IP in last hour
        DataStreamSink<String> response3  = new RuleMaker().getStreamKeyCount(baseStream, "ip", Time.hours(1),
                60, "Hours").print();

        env.execute("Job2");
    }
Run Code Online (Sandbox Code Playgroud)

在另一个类中,我正在为规则执行所有逻辑,我正在计算 IP 地址出现的次数,如果它超过时间窗口中允许的数字,我将返回一条包含一些信息的消息:

规则制定者

public class RuleMaker {


    public DataStream<String> getStreamKeyCount(DataStream<Token> stream, 
                                                String tokenProp,
                                                Time time, 
                                                Integer maxPetitions, 
                                                String ruleType){

        return
               stream
                .flatMap(new FlatMapFunction<Token, Tuple3<String, Integer, String>>() {
                    @Override
                    public void flatMap(Token token, Collector<Tuple3<String, Integer, String>> collector) throws Exception {

                         String tokenSelection = "";
                        switch (tokenProp)
                        {
                            case "ip":
                                tokenSelection = token.getIpAddress();
                                break;
                            case "device":
                                tokenSelection = token.getDeviceFingerprint();
                                break;
                            case "cardHash":
                                tokenSelection = token.getCardHash();
                                break;
                        }
                        collector.collect(new Tuple3<>(tokenSelection, 1, token.get_tokenId()));
                    }
                })
                .keyBy(0)
                .timeWindow(time)
                .process(new MyProcessWindowFunction(maxPetitions, ruleType));
    }

    //Class to process the elements from the window
    private class MyProcessWindowFunction extends ProcessWindowFunction<
            Tuple3<String, Integer, String>,
            String,
            Tuple,
            TimeWindow
            > {

        private Integer _maxPetitions;
        private String  _ruleType;


        public MyProcessWindowFunction(Integer maxPetitions, String ruleType) {
            this._maxPetitions = maxPetitions;
            this._ruleType = ruleType;
        }

        @Override
        public void process(Tuple tuple, Context context, Iterable<Tuple3<String, Integer, String>> iterable, Collector<String> out) throws Exception {

            Integer counter = 0;
            for (Tuple3<String, Integer, String> element : iterable) {
                counter += element.f1++;
                if(counter > _maxPetitions){
                    out.collect("El elemeto ha sido declinado: " + element.f2 + " Num elements: " + counter + " rule type: " +  _ruleType + " token: " + element.f0 );
                    counter = 0;
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

到目前为止,我认为这段代码正在运行,但我是 Apache Flink 的初学者,如果您能告诉我我尝试使用它的方式是否有问题并指出我,我将不胜感激正确的方向。

非常感谢。