我知道hadoop的版本2.7的FileUtil有copyMerge对合并多个文件到一个新的功能.
但是copyMerge该3.0版本的API不再支持该功能.
有关如何将目录中的所有文件合并到3.0hadoop版本中的新单个文件的任何想法?
FileUtil#copyMerge方法已被删除.查看主要更改的详细信息:
https://issues.apache.org/jira/browse/HADOOP-12967
https://issues.apache.org/jira/browse/HADOOP-11392
你可以使用getmerge
用法:hadoop fs -getmerge [-nl]
将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件.可选地,-nl可以设置为允许在每个文件的末尾添加换行符(LF).-skip-empty-file可用于在空文件的情况下避免不需要的换行符.
例子:
hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt
Run Code Online (Sandbox Code Playgroud)
退出代码:成功时返回0,错误时返回非零.
我有同样的问题,不得不重新实现 copyMerge(虽然在 PySpark 中,但使用与原始 copyMerge 相同的 API 调用)。
不知道为什么 Hadoop 3 中没有等效的功能。我们必须经常将文件从 HDFS 目录合并到 HDFS 文件。
这是我在上面引用的 pySpark 中的实现 https://github.com/Tagar/stuff/blob/master/copyMerge.py
由于FileUtil.copyMerge()已被弃用,从原料药的起始版本3去除,一个简单的解决方案包括重新实施它自己。
这是以前版本中的Java原始实现。
这是一个Scala重写:
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException
def copyMerge(
srcFS: FileSystem, srcDir: Path,
dstFS: FileSystem, dstFile: Path,
deleteSource: Boolean, conf: Configuration
): Boolean = {
if (dstFS.exists(dstFile))
throw new IOException(s"Target $dstFile already exists")
// Source path is expected to be a directory:
if (srcFS.getFileStatus(srcDir).isDirectory()) {
val outputFile = dstFS.create(dstFile)
Try {
srcFS
.listStatus(srcDir)
.sortBy(_.getPath.getName)
.collect {
case status if status.isFile() =>
val inputFile = srcFS.open(status.getPath())
Try(IOUtils.copyBytes(inputFile, outputFile, conf, false))
inputFile.close()
}
}
outputFile.close()
if (deleteSource) srcFS.delete(srcDir, true) else true
}
else false
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6389 次 |
| 最近记录: |