Ath*_*kur 2 scala amazon-s3 apache-spark
我在 s3 文件夹中有 Spark 输出,我想将所有 s3 文件从该输出文件夹移动到另一个位置,但在移动时我想重命名这些文件。
例如,我在 S3 文件夹中有文件,如下所示
现在我想重命名所有文件并放入另一个目录中,但文件的名称如下所示
Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.1.2017-10-18-0439.Full.txt
Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.2.2017-10-18-0439.Full.txt
Fundamental.FinancialStatement.FinancialStatementLineItems.Japan.1971-BAL.3.2017-10-18-0439.Full.txt
Run Code Online (Sandbox Code Playgroud)
这里 Fundamental.FinancialStatement 在所有文件的 2017-10-18-0439当前日期时间中都是常量。
这是我到目前为止所尝试过的,但无法获取文件夹名称并循环遍历所有文件
import org.apache.hadoop.fs._
val src = new Path("s3://trfsmallfffile/Segments/output")
val dest = new Path("s3://trfsmallfffile/Segments/Finaloutput")
val conf = sc.hadoopConfiguration // assuming sc = spark context
val fs = src.getFileSystem(conf)
//val file = fs.globStatus(new Path("src/DataPartition=Japan/part*.gz"))(0).getPath.getName
//println(file)
val status = fs.listStatus(src)
status.foreach(filename => {
val a = filename.getPath.getName.toString()
println("file name"+a)
//println(filename)
})
Run Code Online (Sandbox Code Playgroud)
这给了我以下输出
file nameDataPartition=Japan
file nameDataPartition=SelfSourcedPrivate
file nameDataPartition=SelfSourcedPublic
file name_SUCCESS
Run Code Online (Sandbox Code Playgroud)
这为我提供了文件夹详细信息,而不是文件夹内的文件。
参考资料取自这里Stack Overflow Reference
您正在获取目录,因为您在 s3 中有子目录级别。
/*/* to go in subdir .
Run Code Online (Sandbox Code Playgroud)
尝试这个
import org.apache.hadoop.fs._
val src = new Path("s3://trfsmallfffile/Segments/Output/*/*")
val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput")
val conf = sc.hadoopConfiguration // assuming sc = spark context
val fs = src.getFileSystem(conf)
val file = fs.globStatus(new Path("s3://trfsmallfffile/Segments/Output/*/*"))
for (urlStatus <- file) {
//println("S3 FILE PATH IS ===:" + urlStatus.getPath)
val partitioName=urlStatus.getPath.toString.split("=")(1).split("\\/")(0).toString
val finalPrefix="Fundamental.FinancialLineItem.Segments."
val finalFileName=finalPrefix+partitioName+".txt"
val dest = new Path("s3://trfsmallfffile/Segments/FinalOutput"+"/"+finalFileName+ " ")
fs.rename(urlStatus.getPath, dest)
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
7771 次 |
| 最近记录: |