Maa*_*mon 7 kryo sbt-assembly apache-spark spark-structured-streaming
1-问题
我有一个使用Kryo的Spark程序,但不是Spark Mechanics的一部分.更具体地说,我使用连接到Kafka的Spark Structured Streaming.
我读了来自Kafka的二进制值并自己解码.
尝试使用Kryo反序列化数据时,我遇到了异常.但是,只有当我打包程序并在Spark Standalone Cluster上运行它时,才会出现此问题.也就是说,当我运行它时,它不会发生在intellij内,即在Spark Local Mode(开发模式)中.
我得到的例外情况如下:
引起:com.esotericsoftware.kryo.KryoException:无法找到类:com.elsevier.entellect.commons.package $ RawData
请注意,RawData是我自己的案例类,位于我的多项目构建的子项目之一.
要了解上下文,请在下面找到更多详细信息:
2 build.sbt:
lazy val commonSettings = Seq(
organization := "com.elsevier.entellect",
version := "0.1.0-SNAPSHOT",
scalaVersion := "2.11.12",
resolvers += Resolver.mavenLocal,
updateOptions := updateOptions.value.withLatestSnapshots(false)
)
lazy val entellectextractors = (project in file("."))
.settings(commonSettings).aggregate(entellectextractorscommon, entellectextractorsfetchers, entellectextractorsmappers, entellectextractorsconsumers)
lazy val entellectextractorscommon = project
.settings(
commonSettings,
libraryDependencies ++= Seq(
"com.esotericsoftware" % "kryo" % "5.0.0-RC1",
"com.github.romix.akka" %% "akka-kryo-serialization" % "0.5.0" excludeAll(excludeJpountz),
"org.apache.kafka" % "kafka-clients" % "1.0.1",
"com.typesafe.akka" %% "akka-stream" % "2.5.16",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.4",
"com.typesafe.akka" % "akka-slf4j_2.11" % "2.5.16",
"ch.qos.logback" % "logback-classic" % "1.2.3"
)
)
lazy val entellectextractorsfetchers = project
.settings(
commonSettings,
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
"com.typesafe.slick" %% "slick" % "3.2.3",
"com.typesafe.slick" %% "slick-hikaricp" % "3.2.3",
"com.lightbend.akka" %% "akka-stream-alpakka-slick" % "0.20")
)
.dependsOn(entellectextractorscommon)
lazy val entellectextractorsconsumers = project
.settings(
commonSettings,
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-kafka" % "0.22")
)
.dependsOn(entellectextractorscommon)
lazy val entellectextractorsmappers = project
.settings(
commonSettings,
mainClass in assembly := Some("entellect.extractors.mappers.NormalizedDataMapper"),
assemblyMergeStrategy in assembly := {
case PathList("META-INF", "services", "org.apache.spark.sql.sources.DataSourceRegister") => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first},
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.9.5",
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.9.5",
dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.9.5",
dependencyOverrides += "org.apache.jena" % "apache-jena" % "3.8.0",
libraryDependencies ++= Seq(
"org.apache.jena" % "apache-jena" % "3.8.0",
"edu.isi" % "karma-offline" % "0.0.1-SNAPSHOT",
"org.apache.spark" % "spark-core_2.11" % "2.3.1" % "provided",
"org.apache.spark" % "spark-sql_2.11" % "2.3.1" % "provided",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1"
//"com.datastax.cassandra" % "cassandra-driver-core" % "3.5.1"
))
.dependsOn(entellectextractorscommon)
lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")
Run Code Online (Sandbox Code Playgroud)
包含spark代码的子项目是entellectextractorsmappers.包含无法找到的案例类RawData的子项目是entellectextractorscommon.entellectextractorsmappers明确依赖entellectextractorscommon.
3-我在本地独立群集上提交时和在本地开发模式下运行时的区别:
当我提交到群集时,我的火花依赖性如下:
"org.apache.spark" % "spark-core_2.11" % "2.3.1" % "provided",
"org.apache.spark" % "spark-sql_2.11" % "2.3.1" % "provided",
Run Code Online (Sandbox Code Playgroud)
当我在本地开发模式(无提交脚本)中运行时,它们就这样转动
"org.apache.spark" % "spark-core_2.11" % "2.3.1",
"org.apache.spark" % "spark-sql_2.11" % "2.3.1",
Run Code Online (Sandbox Code Playgroud)
也就是说,在本地开发中我需要具有依赖关系,而在以独立模式提交到集群时,它们已经在集群中,所以我按照提供的方式放置它们.
4-如何提交:
spark-submit --class entellect.extractors.mappers.DeNormalizedDataMapper --name DeNormalizedDataMapper --master spark://MaatPro.local:7077 --deploy-mode cluster --executor-memory 14G --num-executors 1 --conf spark.sql.shuffle.partitions=7 "/Users/maatari/IdeaProjects/EntellectExtractors/entellectextractorsmappers/target/scala-2.11/entellectextractorsmappers-assembly-0.1.0-SNAPSHOT.jar"
Run Code Online (Sandbox Code Playgroud)
5-我如何使用Kryo:
5.1-声明和注册
在entellectextractorscommon项目中,我有一个包对象,其中包含以下内容:
package object commons {
case class RawData(modelName: String,
modelFile: String,
sourceType: String,
deNormalizedVal: String,
normalVal: Map[String, String])
object KryoContext {
lazy val kryoPool = new Pool[Kryo](true, false, 16) {
protected def create(): Kryo = {
val kryo = new Kryo()
kryo.setRegistrationRequired(false)
kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
kryo
}
}
lazy val outputPool = new Pool[Output](true, false, 16) {
protected def create: Output = new Output(4096)
}
lazy val inputPool = new Pool[Input](true, false, 16) {
protected def create: Input = new Input(4096)
}
}
object ExecutionContext {
implicit lazy val system = ActorSystem()
implicit lazy val mat = ActorMaterializer()
implicit lazy val ec = system.dispatcher
}
}
Run Code Online (Sandbox Code Playgroud)
5.2用法
在entellectextractorsmappers(spark程序所在的位置)中,我使用mapMartition.在其中,我有一种方法来解码来自kafka的数据,它使用Kryo:
def decodeData(rowOfBinaryList: List[Row], kryoPool: Pool[Kryo], inputPool: Pool[Input]): List[RawData] = {
val kryo = kryoPool.obtain()
val input = inputPool.obtain()
val data = rowOfBinaryList.map(r => r.getAs[Array[Byte]]("message")).map{ binaryMsg =>
input.setInputStream(new ByteArrayInputStream(binaryMsg))
val value = kryo.readClassAndObject(input).asInstanceOf[RawData]
input.close()
value
}
kryoPool.free(kryo)
inputPool.free(input)
data
}
Run Code Online (Sandbox Code Playgroud)
注意:对象KryoContext + Lazy val确保kryoPool每个JVM实例化一次.但我不认为问题来自于此.
我在其他地方红了暗示spark vs Kryo使用的classLoaders问题?但不确定真正理解发生了什么.
如果有人可以给我一些指示,那会有所帮助,因为我不知道从哪里开始.为什么它会在本地模式下工作而不是在集群模式下,提供的内容是否会导致依赖性并与Kryo产生一些问题?这是SBT大会合并战略搞砸了吗?
许多指针可能,如果有人可以帮助我缩小范围,那将是非常棒的!
迄今为止,
我通过选择“封闭”类加载器解决了这个问题,我认为它是来自 Spark 的。这是在准备了一些关于 Kryo 和 Spark 之间的类加载器问题的评论之后:
lazy val kryoPool = new Pool[Kryo](true, false, 16) {
protected def create(): Kryo = {
val cl = Thread.currentThread().getContextClassLoader()
val kryo = new Kryo()
kryo.setClassLoader(cl)
kryo.setRegistrationRequired(false)
kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])
kryo.addDefaultSerializer(classOf[RawData], classOf[ScalaProductSerializer])
kryo
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
464 次 |
| 最近记录: |