Hum*_*ura 2 scala apache-spark apache-flink
如何将此示例 scala spark 代码转换为 apache flink?
reduceByKey( (x, y) => (x._1 + y._1, ( (x._2) ++ y._2) ) )
Run Code Online (Sandbox Code Playgroud)
我意识到reduceByKeyflink 中不存在,但它显示了我想要实现的目标。
谢谢任何帮助!
与 Spark 不同,Flink 不需要键值对来执行 reduce、join 和 coGroup 操作。它可以直接在任何类型上执行它们,例如 POJO、元组或用户类型。你必须提供给 Flink 的是它必须分组的字段。这可以是提取键、逻辑索引或字段名称的函数。当您随后调用该reduce操作时,整个对象将被提供给reduce 函数,而不仅仅是值部分。
因此,假设您有一个input: DataSet[(K, (T, List[U]))]withK作为键类型,那么您的 reduce 函数将如下所示:
input.groupBy(0).reduce{
(left: (K, (T, List[U])), right: (K, (T, List[U]))) =>
val (key, (leftValue1, leftValue2)) = left
val (_, (rightValue1, rightValue2)) = right
(key, (leftValue1 + rightValue1, leftValue2 ++ rightValue2))
}
Run Code Online (Sandbox Code Playgroud)
为了便于理解,我还为匿名函数提供了类型注释。但这不是必需的。
这是 Humberto 特定用例的解决方案,假设输入字段由具有 3 个条目的行组成,空格分隔并且第三个条目是整数:
val input = env.readCsvFile[(String, String, Int)](filePath, lineDelimiter = "\n", fieldDelimiter = " ")
val result = input
.map (element => (element._1, element._3, Map(element._2 -> element._3)))
.groupBy(0)
.reduce{
(left, right) =>
val (key, left1, left2) = left
val (_, right1, right2) = right
(key, left1 + right1, left2 ++ right2)
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1134 次 |
| 最近记录: |