尝试在scala 2.10.4中使用0.10.0 flink版本的流API.在尝试编译第一个版本时:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val words : DataStream[String] = text.flatMap[String](
new Function[String,TraversableOnce[String]] {
def apply(line:String):TraversableOnce[String] = line.split(" ")
})
env.execute("Window Stream wordcount")
}
}
Run Code Online (Sandbox Code Playgroud)
我收到编译时错误:
[error] found : String => TraversableOnce[String]
[error] required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error] new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error] ^
Run Code Online (Sandbox Code Playgroud)
在我已经包含在项目中的反编译版本的DataStream.class中,有接受这种类型的函数(最后一个):
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
if (flatMapper …Run Code Online (Sandbox Code Playgroud) 这是关于Flink Scala API"没有足够的论据"的后续问题.
我希望能够传递Flink的DataSet并使用它做一些事情,但数据集的参数是通用的.
这是我现在遇到的问题:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import scala.reflect.ClassTag
object TestFlink {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
val split = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
id(split).print()
env.execute()
}
def id[K: ClassTag](ds: DataSet[K]): DataSet[K] = ds.map(r => r)
}
Run Code Online (Sandbox Code Playgroud)
我有这个错误ds.map(r => r):
Multiple markers at this line
- not enough arguments for …Run Code Online (Sandbox Code Playgroud)