如何在scala中使用flink fold函数

sth*_*ers 5 scala apache-flink

这是一个非常有效的尝试使用Flink折叠与scala匿名函数:

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])
Run Code Online (Sandbox Code Playgroud)

它汇编得很好,但在执行时,我得到了"类型擦除问题"(见下文).在Java中这样做很好,但当然更冗长.我喜欢简洁明了的lambda.我怎么能在scala中做到这一点?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
Run Code Online (Sandbox Code Playgroud)

Til*_*ann 3

您遇到的问题是 Flink [1] 中的一个错误。该问题源于 FlinkTypeExtractor以及 Scala DataStream API 在 Java 实现之上的实现方式。无法TypeExtractor生成TypeInformationScala 类型的 a ,因此返回 a MissingTypeInformation。此缺失的类型信息是在创建StreamFold运算符后手动设置的。然而,该StreamFold运算符的实现方式是不接受 a MissingTypeInformation,因此在设置正确的类型信息之前会失败。

我已经打开了一个拉取请求 [2] 来解决这个问题。应该会在接下来的两天内合并。通过使用最新的 0.10 快照版本,您的问题应该得到解决。