这似乎是一件很简单的事情,您可能会认为Spark开发人员会以某种方式构建此功能,但我找不到。我见过或尝试过的选项是:
写入Parquet文件并创建外部配置单元表,但我想插入现有的配置单元内部表中。我知道我可以仅使用另一个Spark作业来定期添加此数据来添加此数据,但这并不理想
写一个ForeachWriter我尝试过但很少成功的习惯:
val ds1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", hostPort).option("subscribe", "test").load()
val lines = ds1.selectExpr("CAST(value AS STRING)").as[String]
val words = lines.flatMap(_.split(" "))
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
import org.apache.spark.sql.SparkSession
override def open(partitionId: Long, version: Long) = true
override def process(value: String) = {
val sparksess = SparkSession.builder.master("local[*]").enableHiveSupport().getOrCreate()
import sparksess.implicits._
val valData = List(value).toDF
valData.write.mode("append").insertInto("stream_test")
sparksess.close
}
override def close(errorOrNull: Throwable) = {}
}
val q = words.writeStream.queryName("words-app").foreach(writer)
val query = q.start()
Run Code Online (Sandbox Code Playgroud)注意:在Open方法中创建Sparksession并在close方法中关闭会话似乎很明显,但是这些操作不止一次迭代,然后失败了
注意:上面的代码甚至在第一次迭代后都可以使用,但是抛出了Spark错误,使我不满意该解决方案可以扩展:
18/03/15 11:10:52 ERROR cluster.YarnScheduler: …Run Code Online (Sandbox Code Playgroud)