我正在使用hadoop 2.4.1和Spark 1.1.0.我已经从这里上传了食品评论数据集到HDFS ,然后我使用以下代码来读取文件并在spark shell上处理它:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
var path = "hdfs:///user/hduser/finefoods.txt"
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "\n\n")
var dataset = sc.newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
var datasetObj = dataset.map{ rowStr => rowStr.split("\n")}
var tupleSet = datasetObj.map( strArr => strArr.map( elm => elm.split(": ")(1))).map( arr => (arr(0),arr(1),arr(4).toDouble))
tupleSet.groupBy(t => t._2)
Run Code Online (Sandbox Code Playgroud)
当我运行最后一行时tupleSet.groupBy(t => t._2),spark shell会抛出以下异常:
scala> tupleSet.groupBy( t => t._2).first()
14/11/15 22:46:59 INFO spark.SparkContext: Starting job: first at <console>:28
14/11/15 …Run Code Online (Sandbox Code Playgroud) apache-spark ×1