相关疑难解决方法(0)

在flatMap函数中Apache Flink Streaming类型不匹配

尝试在scala 2.10.4中使用0.10.0 flink版本的流API.在尝试编译第一个版本时:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._

object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val words : DataStream[String] = text.flatMap[String](
      new Function[String,TraversableOnce[String]] { 
        def apply(line:String):TraversableOnce[String] = line.split(" ")
      })

    env.execute("Window Stream wordcount")
  }
}
Run Code Online (Sandbox Code Playgroud)

我收到编译时错误:

[error]  found   : String => TraversableOnce[String]
[error]  required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error]       new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error]       ^
Run Code Online (Sandbox Code Playgroud)

在我已经包含在项目中的反编译版本的DataStream.class中,有接受这种类型的函数(最后一个):

public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
        if (flatMapper …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink flink-streaming

3
推荐指数
1
解决办法
2011
查看次数

Flink Scala API在泛型参数上起作用

这是关于Flink Scala API"没有足够的论据"的后续问题.

我希望能够传递Flink的DataSet并使用它做一些事情,但数据集的参数是通用的.

这是我现在遇到的问题:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import scala.reflect.ClassTag

object TestFlink {

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val split = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
    id(split).print()

    env.execute()
  }

  def id[K: ClassTag](ds: DataSet[K]): DataSet[K] = ds.map(r => r)
}
Run Code Online (Sandbox Code Playgroud)

我有这个错误ds.map(r => r):

Multiple markers at this line
    - not enough arguments for …
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
1129
查看次数

标签 统计

apache-flink ×2

flink-streaming ×1

scala ×1