如何在Hadoop/Spark中重命名大量文件?

Yan*_*eld 6 parallel-processing hadoop bigdata apache-spark

我有一个包含+100,000个文件的输入文件夹.

我想对它们进行批量操作,即以某种方式重命名所有这些操作,或者根据每个文件名称中的信息将它们移动到新路径.

我想使用Spark来做到这一点,但不幸的是,当我尝试下面这段代码时:

 final org.apache.hadoop.fs.FileSystem ghfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(args[0]), new org.apache.hadoop.conf.Configuration());
        org.apache.hadoop.fs.FileStatus[] paths = ghfs.listStatus(new org.apache.hadoop.fs.Path(args[0]));
        List<String> pathsList = new ArrayList<>();
        for (FileStatus path : paths) {
            pathsList.add(path.getPath().toString());
        }
        JavaRDD<String> rddPaths = sc.parallelize(pathsList);

        rddPaths.foreach(new VoidFunction<String>() {
            @Override
            public void call(String path) throws Exception {
                Path origPath = new Path(path);
                Path newPath = new Path(path.replace("taboola","customer"));
                ghfs.rename(origPath,newPath);
            }
        });
Run Code Online (Sandbox Code Playgroud)

我得到一个错误,hadoop.fs.FileSystem不是Serializable(因此可能不能用于并行操作)

知道如何解决它或以其他方式完成它吗?

Dav*_*vid 4

问题是您正在尝试序列化 ghfs 对象。如果您使用 mapPartitions 并在每个分区中重新创建 ghfs 对象,您只需进行一些小的更改就可以运行代码。