如何将多个文本文件读入单个RDD?

use*_*662 171 apache-spark

我想从hdfs位置读取一堆文本文件,并使用spark在迭代中对其执行映射.

JavaRDD<String> records = ctx.textFile(args[1], 1); 能够一次只读取一个文件.

我想读取多个文件并将它们作为单个RDD处理.怎么样?

sam*_*est 288

您可以指定整个目录,使用通配符甚至CSV目录和通配符.例如:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
Run Code Online (Sandbox Code Playgroud)

正如Nick Chammas所指出的,这是Hadoop的曝光,FileInputFormat因此这也适用于Hadoop(和Scalding).

  • 是的,这是将多个文件作为单个RDD打开的最便捷方式.这里的API只是[Hadoop的FileInputFormat API](http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.html)的曝光,所以所有相同的`Path`选项都适用. (10认同)
  • `sc.wholeTextFiles`对于没有行分隔的数据很方便 (7认同)
  • 我终于找到了这个邪恶的文件模式匹配是如何工作的 http://stackoverflow.com/a/33917492/306488 所以我不再需要逗号分隔了 (2认同)

clo*_*oud 35

使用union方法如下:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)
Run Code Online (Sandbox Code Playgroud)

然后bigRdd是包含所有文件的RDD.


小智 28

您可以使用单个textFile调用来读取多个文件.斯卡拉:

sc.textFile(','.join(files)) 
Run Code Online (Sandbox Code Playgroud)

  • 我认为这是_only_ python语法.Scala等价物是`sc.textFile(files.mkString(","))` (7认同)
  • 和相同的python语法 (5认同)

Mur*_*ala 9

你可以用它

首先,您可以获得S3路径的缓冲区/列表:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }
Run Code Online (Sandbox Code Playgroud)

现在将此List对象传递给下面的代码段,注意:sc是SQLContext的对象

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }
Run Code Online (Sandbox Code Playgroud)

现在你有一个最终的统一RDD即df

可选,您也可以在一个BigRDD中重新分配它

val files = sc.textFile(filename, 1).repartition(1)
Run Code Online (Sandbox Code Playgroud)

重新分区始终有效:D

  • 我们可以并行化读取列出文件的操作吗?像sc.parallelize这样的东西? (2认同)