Ath*_*kur 1 scala amazon-s3 amazon-web-services apache-spark apache-zeppelin
我在S3中存储了大约一百万个文本文件。我想根据文件夹名称重命名所有文件。
我如何在Spark-Scala中做到这一点?
我正在寻找一些示例代码。
我正在使用齐柏林飞艇来运行我的spark脚本。
下面的代码我已经尝试从答案中建议
import org.apache.hadoop.fs._
val src = new Path("s3://trfsmallfffile/FinancialLineItem/MAIN")
val dest = new Path("s3://trfsmallfffile/FinancialLineItem/MAIN/dest")
val conf = sc.hadoopConfiguration // assuming sc = spark context
val fs = Path.getFileSystem(conf)
fs.rename(src, dest)
Run Code Online (Sandbox Code Playgroud)
但是低于错误
<console>:110: error: value getFileSystem is not a member of object org.apache.hadoop.fs.Path
val fs = Path.getFileSystem(conf)
Run Code Online (Sandbox Code Playgroud)
您可以使用普通的HDFS API,例如(输入,未经测试)
val src = new Path("s3a://bucket/data/src")
val dest = new Path("s3a://bucket/data/dest")
val conf = sc.hadoopConfiguration // assuming sc = spark context
val fs = src.getFileSystem(conf)
fs.rename(src, dest)
Run Code Online (Sandbox Code Playgroud)
该方式S3A客户假货的重命名是copy + delete每一个文件的,所以花费的时间是成正比的循环移位#of文件和数据量。S3限制了您的工作:如果尝试并行执行此操作,则可能会使您减速。如果需要“一段时间”,请不要感到惊讶。
您还会按每次COPY通话收取费用,每1,000个通话0.005,因此尝试大约要花费$ 5。在一个小的目录上进行测试,直到确定一切正常
| 归档时间: |
|
| 查看次数: |
2152 次 |
| 最近记录: |