fs.rename(新路径(rawFileName),新路径(processFileName))不起作用

Cha*_*n B 4 hadoop hadoop-partitioning apache-spark hadoop2 apache-spark-sql

我正在研究基于 Scala 的 Apache Spark 实现,用于将数据从远程位置加载到 HDFS,然后将数据从 HDFS 摄取到 Hive 表。

使用我的第一个 Spark 作业,我已将数据/文件加载到 HDFS 中的某个位置 -

hdfs://sandbox.hortonworks.com:8020/data/analytics/raw/ 文件夹

让我们考虑一下,在加入 CT_Click_Basic.csv 和 CT_Click_Basic1.csv.gz 文件后,我在 HDFS 中有以下文件 [共享位置的文件名将是此处的文件夹名称,其内容将出现在部分 xxxxx 文件中]:

[root@sandbox ~]# hdfs dfs -ls /data/analytics/raw/*/ 找到 3 项

-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS

-rw-r--r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00000

-rw-r--r-- 3 chauhan.bhupesh hdfs 8395 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic.csv/part-00001

找到 2 件商品

-rw-r--r-- 3 chauhan.bhupesh hdfs 0 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS

-rw-r--r-- 3 chauhan.bhupesh hdfs 16588 2017-07-27 15:02 /data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000

现在使用我的另一个 Spark 作业,我想根据每个阶段执行的任务将这些文件从/raw文件夹移动到/process,最后移动到HDFS 中的/archive文件夹。

为此,我首先使用以下代码获取/raw文件夹下存在的所有文件的列表:

    def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
           if(!fileStatus.isDirectory()) {
                filePaths+=fileStatus.getPath()      
            }
            else {
                listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
            }
        }
  }   
  filePaths
}
Run Code Online (Sandbox Code Playgroud)

然后使用以下代码行,我尝试将 /raw 文件夹中的文件重命名/移动到 /process 文件夹:

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size
   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }
Run Code Online (Sandbox Code Playgroud)

但我无法使用上面编写的代码移动/重命名这些文件。我尝试调试代码,发现 fs.rename() 返回“false”。

请注意:当我在 /data/analytics/raw 文件夹 ex CT.csv [或任何其他文件] 中手动复制任何文件,然后运行 ​​fs.rename() 时,我能够实现文件重命名/移动,但它不起作用对于 Part-xxxxx 文件。

我缺少什么吗?

任何快速帮助将不胜感激。

问候,布佩什

Cha*_*n B 5

最后,我得到了这个问题。实际上,我试图将文件从 /data/analytics/raw/folder.csv/part-xxxxx 重命名为 /data/analytics/process/folder.csv/part-xxxxx,其中 /data/analytics/process 存在于 HDFS 中,但是“文件夹.csv”不存在;因此,它在重命名时返回错误。我在代码中添加了以下行并且对我来说效果很好

var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
   var pathSplit = path.toString().split("/")
   var pathSplitSize = pathSplit.size

   val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)

   var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2)
   var processFolderPath = new Path(processFolderName)
   if(!(fs.exists(processFolderPath)))
         fs.mkdirs(processFolderPath)
   val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1)
   fs.rename(new Path(rawFileName), new Path(processFileName))
 }
Run Code Online (Sandbox Code Playgroud)