Sai*_*aif 3 scala sbt akka sbt-assembly akka-http
我正在尝试建立一个简单的akka-http 2.4.2项目来对其进行测试,但是我没有这样做。
我的built.sbt:
import NativePackagerHelper._
lazy val akkaVersion = "2.4.2"
lazy val root = (project in file(".")).
settings(
name := "akkTest",
version := "0.1",
scalaVersion := "2.11.7")
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaVersion
)
enablePlugins(JavaServerAppPackaging)
我在Main.scala中的代码段
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.actor.ActorSystem
object Main extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val serverSource =
    Http().bind(interface = "localhost", port = 8080)
val bindingFuture =
    serverSource.to(Sink.foreach { connection => // foreach materializes the source
    println("Accepted new connection from " + connection.remoteAddress)
    }).run()
}
执行错误抛出:
Uncaught error from thread [default-akka.actor.default-dispatcher-2] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[default]
java.lang.NoSuchMethodError: akka.actor.ActorCell.addFunctionRef(Lscala/Function2;)Lakka/actor/FunctionRef;
        at akka.stream.stage.GraphStageLogic$StageActor.<init>(GraphStage.scala:143)
        at akka.stream.stage.GraphStageLogic.getStageActor(GraphStage.scala:904)
        at akka.stream.impl.io.ConnectionSourceStage$$anon$1.preStart(TcpStages.scala:56)
        at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:468)
        at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:363)
        at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:502)
        at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:539)
        at akka.actor.Actor$class.aroundPreStart(Actor.scala:472)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:493)
        at akka.actor.ActorCell.create(ActorCell.scala:580)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
这一定是我的环境所致,但是我不知道如何跟踪问题。我正在使用jdk 1.8u71
[info] Done updating.
[info] Including from cache: ssl-config-akka_2.11-0.1.3.jar
[info] Including from cache: reactive-streams-1.0.0.jar
[info] Including from cache: akka-http-spray-json-experimental_2.11-2.4.2.jar
[info] Including from cache: config-1.3.0.jar
[info] Including from cache: spray-json_2.11-1.3.2.jar
[info] Including from cache: ssl-config-core_2.11-0.1.3.jar
[info] Including from cache: scala-parser-combinators_2.11-1.0.4.jar
[info] Including from cache: scala-java8-compat_2.11-0.7.0.jar
[info] Including from cache: akka-parsing_2.11-2.4.2.jar
[info] Including from cache: akka-http-experimental_2.11-2.4.2.jar
[info] Including from cache: akka-actor_2.11-2.4.2.jar
[info] Including from cache: akka-http-core_2.11-2.4.2.jar
[info] Including from cache: akka-stream_2.11-2.4.2.jar
[info] Including from cache: scala-library-2.11.7.jar
请记住,我仅指向相同Akka版本的依赖项
该程序在使用sbt run时可以正常运行,但是在将装配好的jar与我自己的scala启动器一起使用时会失败
问题是Spark内部使用Akka。只要您将Spark Job作为一个独立的应用程序(例如sbt run)运行,这都不是问题,因为将使用您自己的Akka版本。但是,一旦您使用提交应用程序到集群,情况就会发生变化spark-submit。然后,Spark类加载器将通过捆绑在您的Akka实现中选择Akka的内部版本sparkJob.jar。在NoSuchMethodError上面,因此来自akka-stream_2.11-2.4.2调用到akka-actor_2.11-2.3.x.jar其在火花中,而不是akka-actor_2.11-2.4.2.jar它在你的工作是绑定的。该方法addFunctionRef实际上是最近添加的并且在早期版本的Akka中不存在。您可以通过在发生异常的位置设置断点来验证这一点(或使用异常断点)。随着应用程序停在中的问题位置GraphStage,进行评估
materializer.supervsor.getClass().getResource("ActorCell.class")
这将打印出ActorCell从中加载类文件的位置。
为确保您与Spark内部使用的Akka版本保持隔离,可以使用的--driver-class-path选项spark-submit,例如
spark-submit --class MyJob \
  --driver-class-path akka-actor_2.11-2.4.2.jar \
  --master spark://master:7077 \
  sparkJob.jar
如果你这样做,我也建议设定akka-actor到"test, provided"你build.sbt,让你不还包括akka-actor在sparkJob.jar。
| 归档时间: | 
 | 
| 查看次数: | 4351 次 | 
| 最近记录: |