任何人都可以在Scala中共享Flink Kafka示例吗?

wdz*_*wdz 4 scala apache-kafka apache-flink

任何人都可以在Scala中分享Flink Kafka(主要是从Kafka接收消息)的工作示例吗?我知道Spark中有一个KafkaWordCount示例.我只需要在Flink打印出Kafka消息.这真的很有帮助.

Rob*_*ger 7

以下代码显示了如何使用Flink的Scala DataStream API读取Kafka主题:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082
import org.apache.flink.streaming.util.serialization.SimpleStringSchema


object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    val stream = env
      .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties))
      .print

    env.execute("Flink Kafka Example")
  }
}
Run Code Online (Sandbox Code Playgroud)