我正在尝试通过 Apache Spark Streaming 读取 Kafka 主题,但无法弄清楚如何将 DStream 中的数据转换为 DataFrame,然后存储在临时表中。Kafka 中的消息采用 Avro 格式,由 Kafka JDBC Connect 从数据库创建。我有下面的代码,它工作正常,直到它执行spark.read.json读取 json 到数据帧。
package consumerTest
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import scala.util.parsing.json.{JSON, JSONObject}
object Consumer {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "<kafka-server>:9092",
"key.deserializer" -> …Run Code Online (Sandbox Code Playgroud) apache-spark spark-streaming kafka-consumer-api spark-dataframe