有没有一种好的方法可以通过更改表将流加入Spark中?

Ben*_*ann 5 scala apache-spark databricks

我们的Spark环境:DataBricks 4.2(包括Apache Spark 2.3.1,Scala 2.11)

我们试图实现的目标:我们想用一些参考数据丰富流数据,这些参考数据会定期更新。富集通过将流与参考数据结合在一起来完成。

我们实现的目标:我们实现了两个Spark作业(jar):第一个是通过使用.write.mode(SaveMode.Overwrite).saveAsTable(“ TEST_TABLE” )然后调用spark.catalog.refreshTable(“ TEST_TABLE”)

第二项工作(我们称其为流数据)是使用Spark结构化流来流式读取某些数据,然后使用带有表TEST_TABLE的DataFrame.transform()将其加入并写入另一个系统。我们正在.transform()调用的函数中使用spark.read.table(“ TEST_TABLE”)读取参考数据,因此我们在表中获取最新值。不幸的是,每当第一个应用程序更新表时,第二个应用程序就会崩溃。Log4j输出中显示以下消息:

18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748
Run Code Online (Sandbox Code Playgroud)

我们还尝试在读取表之前使缓存无效,但这会降低性能,但应用程序仍然崩溃。我们怀疑根过程是对参考数据集的惰性评估(它仍然“指向”旧数据,现在不再存在)。

您对我们可以采取什么措施来防止此问题有什么建议,或者用动态参考数据加入流的最佳方法是什么?

the*_*tom 6

加入参考数据;不要缓存它,这可以确保您找到源代码。查找由主键 + 计数器表示的最新版本数据,其中该计数器最接近或等于您在 Streaming 应用程序中维护的计数器。每小时写入,附加所有仍然是当前的引用数据,但计数器增加;即新版本。在这里使用镶木地板。