如何在Hadoop 3.0中进行CopyMerge?

Jer*_*emy 12 java hadoop

我知道hadoop的版本2.7FileUtilcopyMerge对合并多个文件到一个新的功能.

但是copyMerge3.0版本的API不再支持该功能.

有关如何将目录中的所有文件合并到3.0hadoop版本中的新单个文件的任何想法?

rav*_*avi 7

FileUtil#copyMerge方法已被删除.查看主要更改的详细信息:

https://issues.apache.org/jira/browse/HADOOP-12967

https://issues.apache.org/jira/browse/HADOOP-11392

你可以使用getmerge

用法:hadoop fs -getmerge [-nl]

将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件.可选地,-nl可以设置为允许在每个文件的末尾添加换行符(LF).-skip-empty-file可用于在空文件的情况下避免不需要的换行符.

例子:

hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt
Run Code Online (Sandbox Code Playgroud)

退出代码:成功时返回0,错误时返回非零.

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge

  • 那是我的恐惧.我想知道为什么copyMerge在最新版本中删除了. (3认同)

Tag*_*gar 7

我有同样的问题,不得不重新实现 copyMerge(虽然在 PySpark 中,但使用与原始 copyMerge 相同的 API 调用)。

不知道为什么 Hadoop 3 中没有等效的功能。我们必须经常将文件从 HDFS 目录合并到 HDFS 文件。

这是我在上面引用的 pySpark 中的实现 https://github.com/Tagar/stuff/blob/master/copyMerge.py


Xav*_*hot 6

由于FileUtil.copyMerge()已被弃用,从原料药的起始版本3去除,一个简单的解决方案包括重新实施它自己。

是以前版本中的Java原始实现。

这是一个Scala重写:

import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.IOUtils
import java.io.IOException

def copyMerge(
    srcFS: FileSystem, srcDir: Path,
    dstFS: FileSystem, dstFile: Path,
    deleteSource: Boolean, conf: Configuration
): Boolean = {

  if (dstFS.exists(dstFile))
    throw new IOException(s"Target $dstFile already exists")

  // Source path is expected to be a directory:
  if (srcFS.getFileStatus(srcDir).isDirectory()) {

    val outputFile = dstFS.create(dstFile)
    Try {
      srcFS
        .listStatus(srcDir)
        .sortBy(_.getPath.getName)
        .collect {
          case status if status.isFile() =>
            val inputFile = srcFS.open(status.getPath())
            Try(IOUtils.copyBytes(inputFile, outputFile, conf, false))
            inputFile.close()
        }
    }
    outputFile.close()

    if (deleteSource) srcFS.delete(srcDir, true) else true
  }
  else false
}
Run Code Online (Sandbox Code Playgroud)

  • 为我工作...谢谢! (2认同)