Apache Flink:如何使用Flink DataSet API从一个数据集创建两个数据集

eas*_*lek 2 apache-flink

我正在使用Flink 0.10.1的DataSet API编写应用程序.我可以在Flink中使用单个运算符获得多个收集器吗?

我想做的是如下:

val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
  (iterator, collector1, collector2) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector1.collect(elem1)
      collector2.collect(elem2)
    }
  } 
} 
Run Code Online (Sandbox Code Playgroud)

目前我正在调用mapPartition两次从一个源数据集生成两个数据集.

val lines = env.readTextFile(...)
val out_small = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem1)
    }
  } 
}
val out_large = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem2)
    }
  } 
}
Run Code Online (Sandbox Code Playgroud)

由于doParsing功能非常昂贵,我想每行只调用一次.

ps如果你能让我知道其他方法以更简单的方式做这种事情,我将非常感激.

Mat*_*Sax 6

Flink不支持多个收藏家.但是,您可以通过添加指示输出类型的附加字段来更改解析步骤的输出:

val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(0, elem1) // 0 indicates small
      collector.collect(1, elem2) // 1 indicates large
    }
  } 
} 
Run Code Online (Sandbox Code Playgroud)

接下来,您将使用intermediate两次输出,并为第一个属性筛选每个输出.第一个过滤器用于0第二个过滤器1(您还可以添加投影以去除第一个属性).

               +---> filter("0") --->
               | 
intermediate --+
               | 
               +---> filter("1") --->
Run Code Online (Sandbox Code Playgroud)