Apache Flink:执行环境和多个接收器

Jin*_*n.J 2 java apache-flink

我的问题可能会引起一些混乱,因此请先查看说明。确定我的问题可能会有所帮助。我稍后将在问题末尾添加我的代码(也欢迎有关我的代码结构/实现的任何建议)。\n感谢您提前提供的任何帮助!

\n\n

我的问题:

\n\n
    \n
  1. 如何在 Flink Batch 处理中定义多个接收器而不让它重复从一个源获取数据?

  2. \n
  3. createCollectionEnvironment()和 和有什么区别getExecutionEnvironment()?我应该在本地环境中使用哪一个?

  4. \n
  5. 有什么用env.execute()?我的代码将输出没有这句话的结果。如果我添加这句话,它会弹出一个异常:

  6. \n
\n\n

-

\n\n
Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to \'execute()\', \'count()\', \'collect()\', or \'print()\'. \n    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) \n    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) \n    at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:34) \n    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) \n    at MainClass.main(MainClass.java:114)\n
Run Code Online (Sandbox Code Playgroud)\n\n

描述: \n编程新手。最近我需要使用 Flink Batch 处理一些数据(对数据进行分组、计算标准差等)。\n但是我遇到了需要输出两个 DataSet 的情况。\n结构是这样的

\n\n
\n

从 Source(Database) -> DataSet 1(使用 zipWithIndex() 添加索引)-> DataSet 2(在保留索引的同时进行一些计算)-> DataSet 3

\n
\n\n

首先我输出DataSet 2,索引是例如从1到10000;\n然后我输出DataSet 3索引变成从10001到20000,尽管我没有更改任何函数中的值。\n我的猜测是在输出DataSet 3而不是使用\n之前的结果时DataSet 2\n使用函数ZipWithIndex()不仅会给出错误的索引号,还会增加与数据库的连接。

\n\n

我想这与执行环境有关,就像我使用时一样

\n\n
\n

ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

\n
\n\n

将给出“错误”的索引号 (10001-20000)\n

\n\n
\n

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

\n
\n\n

将给出正确的索引号(1-10000)\n所花费的时间和数据库连接数不同,打印顺序将相反。

\n\n

操作系统、数据库、其他环境详细信息和版本: \nIntelliJ IDEA 2017.3.5(社区版)\nBuild #IC-173.4674.33,构建于 2018 年 3 月 6 日\nJRE:1.8.0_152-release-1024-b15 amd64\nJVM :JetBrains sro 的 OpenJDK 64 位服务器 VM\nWindows 10 10.0

\n\n

我的测试代码(Java)\xef\xbc\x9a

\n\n

public static void main(String[] args) throws Exception {\n ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

\n\n
    //Table is used to calculate the standard deviation as I figured that there is no such calculation in DataSet.\n    BatchTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);\n\n    //Get Data from a mySql database\n    DataSet<Row> dbData =\n            env.createInput(\n                    JDBCInputFormat.buildJDBCInputFormat()\n                            .setDrivername("com.mysql.cj.jdbc.Driver")\n                            .setDBUrl($database_url)\n                            .setQuery("select value from $table_name where id =33")\n                            .setUsername("username")\n                            .setPassword("password")\n                            .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.DOUBLE_TYPE_INFO))\n                            .finish()\n            );\n\n    // Add index for assigning group (group capacity is 5)\n    DataSet<Tuple2<Long, Row>> indexedData = DataSetUtils.zipWithIndex(dbData);\n\n    // Replace index(long) with group number(int), and convert Row to double at the same time\n    DataSet<Tuple2<Integer, Double>> rawData = indexedData.flatMap(new GroupAssigner());\n\n    //Using groupBy() to combine individual data of each group into a list, while calculating the mean and range in each group\n    //put them into a POJO named GroupDataClass\n    DataSet<GroupDataClass> groupDS = rawData.groupBy("f0").combineGroup(new GroupCombineFunction<Tuple2<Integer, Double>, GroupDataClass>() {\n        @Override\n        public void combine(Iterable<Tuple2<Integer, Double>> iterable, Collector<GroupDataClass> collector) {\n            Iterator<Tuple2<Integer, Double>> it = iterable.iterator();\n            Tuple2<Integer, Double> var1 = it.next();\n            int groupNum = var1.f0;\n\n            // Using max and min to calculate range, using i and sum to calculate mean\n            double max = var1.f1;\n            double min = max;\n            double sum = 0;\n            int i = 1;\n\n            // The list is to store individual value\n            List<Double> list = new ArrayList<>();\n            list.add(max);\n\n            while (it.hasNext())\n            {\n                double next = it.next().f1;\n                sum += next;\n                i++;\n                max = next > max ? next : max;\n                min = next < min ? next : min;\n                list.add(next);\n            }\n\n            //Store group number, mean, range, and 5 individual values within the group\n            collector.collect(new GroupDataClass(groupNum, sum / i, max - min, list));\n        }\n    });\n\n    //print because if no sink is created, Flink will not even perform the calculation.\n    groupDS.print();\n\n\n    // Get the max group number and range in each group to calculate average range\n    // if group number start with 1 then the maximum of group number equals to the number of group\n    // However, because this is the second sink, data will flow from source again, which will double the group number\n    DataSet<Tuple2<Integer, Double>> rangeDS = groupDS.map(new MapFunction<GroupDataClass, Tuple2<Integer, Double>>() {\n        @Override\n        public Tuple2<Integer, Double> map(GroupDataClass in) {\n            return new Tuple2<>(in.groupNum, in.range);\n        }\n    }).max(0).andSum(1);\n\n    // collect and print as if no sink is created, Flink will not even perform the calculation.\n    Tuple2<Integer, Double> rangeTuple = rangeDS.collect().get(0);\n    double range = rangeTuple.f1/ rangeTuple.f0;\n    System.out.println("range = " + range);\n}\n\npublic static class GroupAssigner implements FlatMapFunction<Tuple2<Long, Row>, Tuple2<Integer, Double>> {\n    @Override\n    public void flatMap(Tuple2<Long, Row> input, Collector<Tuple2<Integer, Double>> out) {\n\n        // index 1-5 will be assigned to group 1, index 6-10 will be assigned to group 2, etc.\n        int n = new Long(input.f0 / 5).intValue() + 1;\n        out.collect(new Tuple2<>(n, (Double) input.f1.getField(0)));\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

Art*_*hur 6

  1. 将源连接到多个接收器是可以的,源仅执行一次并且记录被广播到多个接收器。看到这个问题Can Flink write results into multiple files (like Hadoop's MultipleOutputFormat)?

  2. getExecutionEnvironment是当您想要运行作业时获取环境的正确方法。createCollectionEnvironment是一种尝试和测试的好方法。查看文档

  3. 异常错误消息非常清楚:如果您调用 print 或collect,您的数据流就会被执行。所以你有两个选择:

  • 要么您在数据流结束时调用 print/collect,它就会被执行和打印。这对于测试东西很有好处。请记住,每个数据流只能调用一次收集/打印,否则它会在未完全定义的情况下执行多次
  • 要么在数据流末尾添加一个接收器并调用 env.execute()。一旦你的流程更加成熟,这就是你想要做的事情。