小编Tom*_*tom的帖子

Spark Streaming-从Kafka读取json并将json写入其他Kafka主题

我正在尝试为Spark流媒体准备应用程序(Spark 2.1,Kafka 0.10)

我需要从Kafka主题“输入”中读取数据,找到正确的数据并将结果写入主题“输出”中

我可以在KafkaUtils.createDirectStream方法的基础上从Kafka读取数据。

我将RDD转换为json并准备了过滤器:

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>

  val PeopleDf=spark.read.schema(schema1).json(rdd)
  import spark.implicits._
  PeopleDf.show()
  val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
  PeopleDfFilter.show()
}
Run Code Online (Sandbox Code Playgroud)

我可以从Kafka加载数据并使用KafkaProducer将“原样”写入Kafka:

    messages.foreachRDD( rdd => {
      rdd.foreachPartition( partition => {
        val kafkaTopic = "output"
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[String, String](props)
        partition.foreach{ record: ConsumerRecord[String, String] => {
        System.out.print("########################" + record.value())
        val …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-streaming

5
推荐指数
1
解决办法
7353
查看次数