shi*_*ali 15 scala spark-streaming apache-spark-sql
我正试图在spark中读取来自kafka(版本10)的消息并尝试打印它.
import spark.implicits._
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.config("spark.master", "local")
.getOrCreate()
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
ds1.collect.foreach(println)
ds1.writeStream
.format("console")
.start()
ds1.printSchema()
Run Code Online (Sandbox Code Playgroud)
获取错误线程"main"中的异常
org.apache.spark.sql.AnalysisException:必须使用writeStream.start();;执行带有流源的查询
ssi*_*ice 15
您正在分支查询计划:从您尝试的相同ds1:
ds1.collect.foreach(...)ds1.writeStream.format(...){...}但是你只是.start()在第二个分支上调用,而另一个分支没有终止而留下了另一个分支,这反过来又引发了你要回来的异常.
解决方案是启动两个分支并等待终止.
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicA")
.load()
val query1 = ds1.collect.foreach(println)
.writeStream
.format("console")
.start()
val query2 = ds1.writeStream
.format("console")
.start()
ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
小智 6
我在这个问题上挣扎了很多。我尝试了各种博客中的每个建议解决方案。但我的情况是,在查询调用 start() 和最后我调用 awaitTerminate() 函数之间几乎没有语句会导致此问题。
请以这种方式尝试,它对我来说非常有用。工作示例:
val query = df.writeStream
.outputMode("append")
.format("console")
.start().awaitTermination();
Run Code Online (Sandbox Code Playgroud)
如果你这样写会导致异常/错误:
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
// some statement
// some statement
query.awaitTermination();
Run Code Online (Sandbox Code Playgroud)
将抛出给定的异常并将关闭您的流驱动程序。
小智 5
我通过使用以下代码解决了问题。
val df = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "streamTest2")
.load();
val query = df.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)