必须使用writeStream.start()执行带有流源的查询;

shi*_*ali 15 scala spark-streaming apache-spark-sql

我正试图在spark中读取来自kafka(版本10)的消息并尝试打印它.

     import spark.implicits._

         val spark = SparkSession
              .builder
              .appName("StructuredNetworkWordCount")
              .config("spark.master", "local")
              .getOrCreate()  

            val ds1 = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "localhost:9092")  
              .option("subscribe", "topicA")
              .load()

           ds1.collect.foreach(println)
           ds1.writeStream
           .format("console")
           .start()

           ds1.printSchema()
Run Code Online (Sandbox Code Playgroud)

获取错误线程"main"中的异常

org.apache.spark.sql.AnalysisException:必须使用writeStream.start();;执行带有流源的查询

ssi*_*ice 15

您正在分支查询计划:从您尝试的相同ds1:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

但是你只是.start()在第二个分支上调用,而另一个分支没有终止而留下了另一个分支,这反过来又引发了你要回来的异常.

解决方案是启动两个分支并等待终止.

val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "topicA")  
  .load()
val query1 = ds1.collect.foreach(println)
  .writeStream
  .format("console")
  .start()
val query2 = ds1.writeStream
  .format("console")
  .start()

ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()
Run Code Online (Sandbox Code Playgroud)


小智 6

我在这个问题上挣扎了很多。我尝试了各种博客中的每个建议解决方案。但我的情况是,在查询调用 start() 和最后我调用 awaitTerminate() 函数之间几乎没有语句会导致此问题。

请以这种方式尝试,它对我来说非常有用。工作示例:

val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination();
Run Code Online (Sandbox Code Playgroud)

如果你这样写会导致异常/错误:

val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // some statement 
    // some statement 

    query.awaitTermination();
Run Code Online (Sandbox Code Playgroud)

将抛出给定的异常并将关闭您的流驱动程序。


小智 5

我通过使用以下代码解决了问题。

 val df = session
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", "streamTest2")
  .load();

    val query = df.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)