在Scala / Spark的HDFS上将文件从一个文件夹移动到另一个文件夹

osk*_*osk 3 hadoop scala hdfs apache-spark

我有两个路径,一个用于文件,一个用于文件夹。我想将文件移到HDFS上的该文件夹中。我该如何在Scala中做到这一点?我也在用Spark

奖金,如果相同的代码将适用于Windows的路径工作太,就像读/ HDFS上写文件,但不是必需的。

我尝试了以下方法:

val fs = FileSystem.get(sc.hadoopConfiguration)
fs.moveFromLocalFile(something, something2)
Run Code Online (Sandbox Code Playgroud)

我得到以下错误:

线程“主”中的异常java.lang.IllegalArgumentException:错误的FS:hdfs:/user/o/datasets/data.txt,预期的是:file:///

同去的moveToLocalFile(),因为他们是为了传输文件的文件系统之间,没有一个文件系统中。我也尝试过,fs.rename()但这根本没有做任何事情(没有错误或任何错误)。

我基本上都在一个目录下创建文件(流写入到它们),一旦他们这样做,他们需要移动到不同的目录。这种不同的目录由星火流媒体监控和我有一些问题,当星火流尝试工作,没有完成的文件

Sah*_*sai 7

试试下面的Scala代码。

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)

val srcPath = new Path(srcFilePath)
val destPath = new Path(destFilePath)

hdfs.copyFromLocalFile(srcPath, destPath)
Run Code Online (Sandbox Code Playgroud)

您还应该检查Spark是否在conf / spark-env.sh文件中设置了HADOOP_CONF_DIR变量。这将确保Spark将找到Hadoop配置设置。

build.sbt文件的依赖项:

libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.6.0"
libraryDependencies += "org.apache.commons" % "commons-io" % "1.3.2"
libraryDependencies += "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"
Run Code Online (Sandbox Code Playgroud)

要么

您可以使用来自Apache Commons的IOUtils将数据从InputStream复制到OutputStream

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.commons.io.IOUtils;



val hadoopconf = new Configuration();
val fs = FileSystem.get(hadoopconf);

//Create output stream to HDFS file
val outFileStream = fs.create(new Path("hdfs://<namenode>:<port>/output_path"))

//Create input stream from local file
val inStream = fs.open(new Path("hdfs://<namenode>:<port>/input_path"))

IOUtils.copy(inStream, outFileStream)

//Close both files
inStream.close()
outFileStream.close()
Run Code Online (Sandbox Code Playgroud)