我们的Spark环境:DataBricks 4.2(包括Apache Spark 2.3.1,Scala 2.11)
我们试图实现的目标:我们想用一些参考数据丰富流数据,这些参考数据会定期更新。富集通过将流与参考数据结合在一起来完成。
我们实现的目标:我们实现了两个Spark作业(jar):第一个是通过使用.write.mode(SaveMode.Overwrite).saveAsTable(“ TEST_TABLE” )然后调用spark.catalog.refreshTable(“ TEST_TABLE”)
第二项工作(我们称其为流数据)是使用Spark结构化流来流式读取某些数据,然后使用带有表TEST_TABLE的DataFrame.transform()将其加入并写入另一个系统。我们正在.transform()调用的函数中使用spark.read.table(“ TEST_TABLE”)读取参考数据,因此我们在表中获取最新值。不幸的是,每当第一个应用程序更新表时,第二个应用程序就会崩溃。Log4j输出中显示以下消息:
18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748
Run Code Online (Sandbox Code Playgroud)
我们还尝试在读取表之前使缓存无效,但这会降低性能,但应用程序仍然崩溃。我们怀疑根过程是对参考数据集的惰性评估(它仍然“指向”旧数据,现在不再存在)。 …