Far*_*rah 2 scala sbt scalatest apache-spark apache-spark-standalone
我们有一个包含多个测试套件的大项目,每个测试套件平均有 3 个测试。
\n\n对于我们的单元测试,我们使用 Spark Standalone,因此没有 Yarn 作为资源管理器。\n每个测试套件:
\n\n启动 Spark 会话:
\n\n implicit val spark = SparkSession\n .builder()\n .config(sparkConf)\n .getOrCreate()\nRun Code Online (Sandbox Code Playgroud)\n\n延伸BeforeAndAfterAll:
class MyTestsSpec extends WordSpec\n with Matchers\n with BeforeAndAfterAll {\n...\n}\nRun Code Online (Sandbox Code Playgroud)\n\n并重新定义 afterAll :
\n\n override def afterAll: Unit = {\n try {\n spark.stop()\n } finally {\n super.afterAll\n }\n }\nRun Code Online (Sandbox Code Playgroud)\n\n我们的解决方案在 Jenkins 中有一个 CI 作业,并且 Jenkins 作业开始经常不稳定,因为测试因以下错误而失败:
\n\nMessage d'erreur\nJob 9 cancelled because SparkContext was shut down\nPile d'ex\xc3\xa9cution\norg.apache.spark.SparkException: Job 9 cancelled because SparkContext was shut down\nat org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:820)\n at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:818)\n at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)\n at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:818)\n at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1732)\n at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)\n at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1651)\n at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921)\n at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1317)\n at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)\n at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)\n at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)\n at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)\n at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)\n at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)\n at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)\n at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)\n at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)\n at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)\n at scala.util.Try$.apply(Try.scala:192)\n at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)\n at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)\n at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)\n at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)\n at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)\n at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)\n at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)\n at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)\n at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)\n at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)\n at org.apache.spark.rdd.RDD.collect(RDD.scala:935)\n at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)\n at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)\n at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)\n at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)\n at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)\n at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)\n at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)\n at org.apache.spark.sql.Dataset.collect(Dataset.scala:2390)\n// some business classes\nat org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)\n at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)\n at org.scalatest.Transformer.apply(Transformer.scala:22)\n at org.scalatest.Transformer.apply(Transformer.scala:20)\n at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1078)\n at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)\n at org.scalatest.WordSpec.withFixture(WordSpec.scala:1881)\n at org.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:1075)\n at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)\n at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1088)\n at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)\n at org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1088)\n at org.scalatest.WordSpec.runTest(WordSpec.scala:1881)\n at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147)\n at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1147)\n at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)\n at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)\n at scala.collection.immutable.List.foreach(List.scala:392)\n at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)\n at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)\n at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)\n at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)\n at scala.collection.immutable.List.foreach(List.scala:392)\n at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)\n at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)\n at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)\n at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1147)\n at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)\n at org.scalatest.Suite$class.run(Suite.scala:1147)\n at org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)\n at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)\n at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)\n at org.scalatest.SuperEngine.runImpl(Engine.scala:521)\n at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1192)\n// some business classes\nat org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)\n at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)\n// some business classes\nat org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)\n at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)\n at sbt.TestRunner.runTest$1(TestFramework.scala:106)\n at sbt.TestRunner.run(TestFramework.scala:117)\n at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:262)\n at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:233)\n at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:262)\n at sbt.TestFramework$$anon$2$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:262)\n at sbt.TestFunction.apply(TestFramework.scala:271)\n at sbt.Tests$.processRunnable$1(Tests.scala:307)\n at sbt.Tests$.$anonfun$makeSerial$1(Tests.scala:313)\n at sbt.std.Transform$$anon$3.$anonfun$apply$2(System.scala:46)\n at sbt.std.Transform$$anon$4.work(System.scala:66)\n at sbt.Execute.$anonfun$submit$2(Execute.scala:262)\n at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:16)\n at sbt.Execute.work(Execute.scala:271)\n at sbt.Execute.$anonfun$submit$1(Execute.scala:262)\n at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:174)\n at sbt.CompletionService$$anon$2.call(CompletionService.scala:36)\n at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n at java.lang.Thread.run(Thread.java:748)\nRun Code Online (Sandbox Code Playgroud)\n\n当我们单独运行测试时,它会成功,没有任何问题。
\n我有类似的问题。
我使用 Spark 进行了多次测试,但只有第一个套件有效。我把spark.close()他们所有人都叫来了。从所有套件中删除此调用后,它就起作用了。
在研究SparkSession 代码之后,我的结论是,由于每个 JVM 只能有一个SparkContext,并且测试在同一个 JVM 上运行,因此当您第一次停止它时,它对于该“JVM 会话”将变得无法使用。
| 归档时间: |
|
| 查看次数: |
3014 次 |
| 最近记录: |