Cha*_*n B 4 hadoop hadoop-partitioning apache-spark hadoop2 apache-spark-sql
我正在研究基于 Scala 的 Apache Spark 实现,用于将数据从远程位置加载到 HDFS,然后将数据从 HDFS 摄取到 Hive 表。
使用我的第一个 Spark 作业,我已将数据/文件加载到 HDFS 中的某个位置 -
hdfs://sandbox.hortonworks.com:8020/data/analytics/raw/ 文件夹
让我们考虑一下,在加入 CT_Click_Basic.csv 和 CT_Click_Basic1.csv.gz 文件后,我在 HDFS 中有以下文件 [共享位置的文件名将是此处的文件夹名称,其内容将出现在部分 xxxxx 文件中]:
[root@sandbox ~]# hdfs dfs -ls /data/analytics/raw/*/ 找到 3 项
-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS
-rw-r--r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00000
-rw-r--r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00001
找到 2 件商品
-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS
-rw-r--r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000
现在使用我的另一个 Spark 作业,我想根据每个阶段执行的任务将这些文件从/raw文件夹移动到/process,最后移动到HDFS 中的/archive文件夹。
为此,我首先使用以下代码获取/raw文件夹下存在的所有文件的列表:
def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
if(!fileStatus.isDirectory()) {
filePaths+=fileStatus.getPath()
}
else {
listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
}
}
}
filePaths
}
Run Code Online (Sandbox Code Playgroud)
然后使用以下代码行,我尝试将 /raw 文件夹中的文件重命名/移动到 /process 文件夹:
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
Run Code Online (Sandbox Code Playgroud)
但我无法使用上面编写的代码移动/重命名这些文件。我尝试调试代码,发现 fs.rename() 返回“false”。
请注意:当我在 /data/analytics/raw 文件夹 ex CT.csv [或任何其他文件] 中手动复制任何文件,然后运行 fs.rename() 时,我能够实现文件重命名/移动,但它不起作用对于 Part-xxxxx 文件。
我缺少什么吗?
任何快速帮助将不胜感激。
问候,布佩什
最后,我得到了这个问题。实际上,我试图将文件从 /data/analytics/raw/folder.csv/part-xxxxx 重命名为 /data/analytics/process/folder.csv/part-xxxxx,其中 /data/analytics/process 存在于 HDFS 中,但是“文件夹.csv”不存在;因此,它在重命名时返回错误。我在代码中添加了以下行并且对我来说效果很好
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
var processFolderPath = new Path(processFolderName)
if(!(fs.exists(processFolderPath)))
fs.mkdirs(processFolderPath)
val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
5102 次 |
最近记录: |