and*_*ani 6 hadoop apache-kafka apache-spark spark-streaming apache-spark-sql
我SparkStreaming通过将数据转换为数据帧来将数据写入 HDFS:
代码
object KafkaSparkHdfs {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
sparkConf.set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(sparkConf)
def main(args: Array[String]): Unit = {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sparkConf, Seconds(20))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "stream3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("fridaydata")
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(consumerRecord => consumerRecord.value)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(word => (word, 1))
val wordCount = wordMap.reduceByKey(_ + _)
wordCount.foreachRDD(rdd => {
val dataframe = rdd.toDF();
dataframe.write
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/newfile24")
})
ssc.start()
ssc.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
文件夹已创建,但文件未写入。
该程序因以下错误而终止:
18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.lang.Thread.run(Thread.java:748)
18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Run Code Online (Sandbox Code Playgroud)
在我的 pom 中,我使用各自的依赖项:
该错误是由于尝试同时运行多个 Spark 上下文造成的。设置allowMultipleContexts为 true 主要用于测试目的,不鼓励使用。因此,解决您问题的方法是确保SparkContext在任何地方都使用相同的方法。从代码中我们可以看出,SparkContext( sc)是用来创建一个的SQLContext,这样就可以了。但是,创建时StreamingContext不使用它,而是SparkConf使用。
通过查看文档我们看到:
通过提供新 SparkContext 所需的配置来创建 StreamingContext
换句话说,通过使用SparkConf作为参数,SparkContext将创建一个新的。现在有两个不同的上下文。
这里最简单的解决方案是继续使用与以前相同的上下文。将创建的行更改为StreamingContext:
val ssc = new StreamingContext(sc, Seconds(20))
Run Code Online (Sandbox Code Playgroud)
注意:在较新版本的 Spark (2.0+) 中请SparkSession改为使用。然后可以使用创建新的流上下文StreamingContext(spark.sparkContext, ...)。它可以如下所示:
val spark = SparkSession().builder
.setMaster("local[*]")
.setAppName("SparkKafka")
.getOrCreate()
import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
21094 次 |
| 最近记录: |