use*_*562 6 hadoop apache-spark
在给定目录中,我有许多不同的文件夹,在每个文件夹中我都有Hadoop文件(part_001等等).
directory
-> folder1
-> part_001...
-> part_002...
-> folder2
-> part_001...
...
Run Code Online (Sandbox Code Playgroud)
给定目录,如何递归读取此目录中所有文件夹的内容,并使用Scala将此内容加载到Spark中的单个RDD中?
我发现了这个,但它没有递归进入子文件夹(我正在使用import org.apache.hadoop.mapreduce.lib.input):
var job: Job = null
try {
job = Job.getInstance()
FileInputFormat.setInputPaths(job, new Path("s3n://" + bucketNameData + "/" + directoryS3))
FileInputFormat.setInputDirRecursive(job, true)
} catch {
case ioe: IOException => ioe.printStackTrace(); System.exit(1);
}
val sourceData = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).values
Run Code Online (Sandbox Code Playgroud)
我也发现这个网页使用SequenceFile,但同样,我不知道如何将它应用到我的情况?
dbu*_*osp 10
如果您使用Spark,可以使用wilcards执行此操作,如下所示:
scala>sc.textFile("path/*/*")
Run Code Online (Sandbox Code Playgroud)
sc是SparkContext,如果使用spark-shell,默认情况下会初始化,或者如果你要创建自己的程序,则必须自己实例化一个SparkContext.
小心以下标志:
scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")res6:String = null
哟应该将此标志设置为true:
sc.hadoopConfiguration.set( "mapreduce.input.fileinputformat.input.dir.recursive", "真")
我发现参数必须这样设置:
.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10260 次 |
| 最近记录: |