Kafka与Scala一起流

use*_*735 1 scala apache-kafka kafka-consumer-api apache-kafka-streams

我试图使用下面scala的kafka流是我的Java代码,它完全正常:

KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textLines = builder.stream("TextLinesTopic");
    textLines.foreach((key,values) -> {
        System.out.println(values);
    });

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
Run Code Online (Sandbox Code Playgroud)

我的scala代码如下:

  val builder = new KStreamBuilder
  val textLines:KStream[String, String]  = builder.stream("TextLinesTopic")
  textLines.foreach((key,value)-> {
   println(key)
  })

  val streams = new KafkaStreams(builder, config)
  streams.start()
Run Code Online (Sandbox Code Playgroud)

scala代码抛出编译错误.期望类型不匹配:找不到ForEachAction [ > String, > String],Actual((any,any),Unit):找不到值键:值value

有没有人知道如何在scala中使用流API

L.L*_*art 5

你的语法错了:).->只是创建对的运算符,所以表达式

(key,value)-> {
  println(key)
}
Run Code Online (Sandbox Code Playgroud)

有一个类型((任意,任意),单位),因为编译器不能推断出任何类型的信息(以及keyvalue缺失)

如果您正在使用Scala的2.12更换->=>应解决的问题,但如果你使用Scala的旧版本,你必须明确地实现Java双功能:

 textLines.foreach(new BiFunction[T1, T2] { ... })
Run Code Online (Sandbox Code Playgroud)