Flink 的简单 hello world 示例

Den*_*din 7 apache-flink

我正在寻找 Apache flink 的 hello-world 体验的最简单的示例。

假设我刚刚在一个干净的盒子上安装了 flink,那么为了“让它做某事”我需要做的最低限度是什么。我意识到这很模糊,这里有一些例子。

来自终端的三个 python 示例:

python -c "print('hello world')"
python hello_world.py
python python -c "print(1+1)"
Run Code Online (Sandbox Code Playgroud)

当然,流应用程序要复杂一些,但这里与我之前为 Spark 流所做的类似:

https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example

正如您所看到的,这些示例有一些很好的属性:

  1. 它们是最小的
  2. 对其他工具/资源的依赖性最小
  3. 逻辑可以进行简单调整(例如不同的数字或不同的分隔符)

所以我的问题是:

Flink 最简单的 hello world 示例是什么


到目前为止,我发现的是需要编译的包含 50 行代码的示例。

如果由于第 3 点而无法避免这种情况,那么满足第 1 点和第 2 点并使用(仅)默认提供的 jar 或从信誉良好的来源轻松获得的 jar 也可以。

Dav*_*son 6

好的,这个怎么样:

public static void main(String[] args) throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.fromElements(1, 2, 3, 4, 5)
    .map(i -> 2 * i)
    .print();

  env.execute();
}
Run Code Online (Sandbox Code Playgroud)


Anu*_*and 5

在大多数大数据和相关框架中,我们都以 Word Count 程序作为 Hello World 的例子。下面是Flink中字数统计的代码:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSet<String> text = env.fromCollection(Arrays.asList("This is line one. This is my line number 2. Third line is here".split(". ")));

    DataSet<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : line.split(" ")) {
              out.collect(new Tuple2<>(word, 1));
            }
          }
        })
        .groupBy(0)
        .sum(1);

wordCounts.print();
Run Code Online (Sandbox Code Playgroud)

从文件中读取

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    //The path of the file, as a URI
    //(e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
    DataSet<String> text = env.readTextFile("/path/to/file");

    DataSet<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : line.split(" ")) {
              out.collect(new Tuple2<String, Integer>(word, 1));
            }
          }
        })
        .groupBy(0)
        .sum(1);

    wordCounts.print();
Run Code Online (Sandbox Code Playgroud)

不要使用 try catch 处理 wordCounts.print() 上引发的异常,而是将 throw 添加到方法签名中。

将以下依赖项添加到 pom.xml。

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.8.0</version>
</dependency> 
Run Code Online (Sandbox Code Playgroud)

在这里阅读有关 flatMap、groupBy、sum 和其他 flink 操作的信息:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/

Flink Streaming 文档和示例:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html