我正在使用Flink 0.10.1的DataSet API编写应用程序.我可以在Flink中使用单个运算符获得多个收集器吗?
我想做的是如下:
val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
(iterator, collector1, collector2) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector1.collect(elem1)
collector2.collect(elem2)
}
}
}
Run Code Online (Sandbox Code Playgroud)
目前我正在调用mapPartition两次从一个源数据集生成两个数据集.
val lines = env.readTextFile(...)
val out_small = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem1)
}
}
}
val out_large = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val …Run Code Online (Sandbox Code Playgroud) apache-flink ×1