Gna*_*ana 7 streaming scala apache-spark spark-streaming scala-streams
我使用Scala编写了Kafka流程序并在Spark独立集群中执行.代码在我的本地工作正常.我在Azure VM中完成了Kafka,Cassandra和Spark设置.我已打开所有入站和出站端口以避免端口阻塞.
开始大师
sbin目录> ./ start-master.sh
开始奴隶
sbin#./ start-slave.sh spark:// vm-hostname:7077
我已在Master WEB UI中验证了此状态.
提交工作
bin#./ spark-submit --class xyStreamJob --master spark:// vm-hostname:7077 /home/user/appl.jar
我注意到在WEB WEB UI中添加并显示了Application.
我已经向主题发布了一些消息,并且没有收到消息并将其保存到Cassandra DB.
我在主Web控制台上单击了应用程序名称,发现该应用程序控制台页面中的Streaming选项卡不可用.
为什么应用程序不能在VM中运行并且在本地运行良好?
如何在VM中调试问题?
def main(args: Array[String]): Unit = {
val spark = SparkHelper.getOrCreateSparkSession()
val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
spark.sparkContext.setLogLevel("WARN")
val kafkaStream = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" ->
"vmip:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "loc",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("hello")
val numPartitionsOfInputTopic = 3
val streams = (1 to numPartitionsOfInputTopic) map {
_ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )
}
streams
}
kafkaStream.foreach(rdd=> {
rdd.foreachRDD(conRec=> {
val offsetRanges = conRec.asInstanceOf[HasOffsetRanges].offsetRanges
conRec.foreach(str=> {
try {
println(str.value().trim)
CassandraHelper.saveItemEvent(str.value().trim)
}catch {
case ex: Exception => {
println(ex.getMessage)
}
}
})
rdd.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
println("Read Msg")
})
println(" Spark parallel reader is ready !!!")
ssc.start()
ssc.awaitTermination()
}
def getSparkConf(): SparkConf = {
val conf = new SparkConf(true)
.setAppName("TestAppl")
.set("spark.cassandra.connection.host", "vmip")
.set("spark.streaming.stopGracefullyOnShutdown","true")
.setMaster("spark://vm-hostname:7077")
conf
}
Run Code Online (Sandbox Code Playgroud)
版
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion %"provided",
"org.apache.spark" %% "spark-sql" % sparkVersion %"provided",
"org.apache.spark" %% "spark-hive" % sparkVersion %"provided",
"com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion ,
"org.apache.kafka" %% "kafka" % "0.10.1.0",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion %"provided",
)
mergeStrategy in assembly := {
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case x => (mergeStrategy in assembly).value(x)
}
Run Code Online (Sandbox Code Playgroud)
要调试您的问题,首先要确保消息通过 Kafka。为此,您需要在虚拟机上打开端口 9092 并尝试直接从 Kafka 消费
bin/kafka-console-consumer.sh --bootstrap-server vmip:9092 --topic hello --from-beginning
Run Code Online (Sandbox Code Playgroud)
from-begining选项将消耗所有内容,直至您在 Kafka 主题上配置的最大保留时间。
还要检查您的虚拟机中是否没有 2 个版本的 Spark,并且您需要使用“spark2-submit”来提交 Spark2 作业。
| 归档时间: |
|
| 查看次数: |
408 次 |
| 最近记录: |