小编shi*_*ali的帖子

必须使用writeStream.start()执行带有流源的查询;

我正试图在spark中读取来自kafka(版本10)的消息并尝试打印它.

     import spark.implicits._

         val spark = SparkSession
              .builder
              .appName("StructuredNetworkWordCount")
              .config("spark.master", "local")
              .getOrCreate()  

            val ds1 = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "localhost:9092")  
              .option("subscribe", "topicA")
              .load()

           ds1.collect.foreach(println)
           ds1.writeStream
           .format("console")
           .start()

           ds1.printSchema()
Run Code Online (Sandbox Code Playgroud)

获取错误线程"main"中的异常

org.apache.spark.sql.AnalysisException:必须使用writeStream.start();;执行带有流源的查询

scala spark-streaming apache-spark-sql

15
推荐指数
3
解决办法
2万
查看次数

从spark流中的kafka消息中提取时间戳?

尝试从卡夫卡源读取。我想从收到的消息中提取时间戳以进行结构化火花流处理。kafka(版本 0.10.0.0) Spark Streaming(版本 2.0.1)

apache-kafka spark-streaming spark-streaming-kafka

5
推荐指数
1
解决办法
5010
查看次数