我想从hdfs位置读取一堆文本文件,并使用spark在迭代中对其执行映射.
JavaRDD<String> records = ctx.textFile(args[1], 1); 能够一次只读取一个文件.
我想读取多个文件并将它们作为单个RDD处理.怎么样?
sam*_*est 288
您可以指定整个目录,使用通配符甚至CSV目录和通配符.例如:
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
Run Code Online (Sandbox Code Playgroud)
正如Nick Chammas所指出的,这是Hadoop的曝光,FileInputFormat因此这也适用于Hadoop(和Scalding).
clo*_*oud 35
使用union方法如下:
val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)
Run Code Online (Sandbox Code Playgroud)
然后bigRdd是包含所有文件的RDD.
小智 28
您可以使用单个textFile调用来读取多个文件.斯卡拉:
sc.textFile(','.join(files))
Run Code Online (Sandbox Code Playgroud)
你可以用它
首先,您可以获得S3路径的缓冲区/列表:
import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest
def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]
//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();
//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)
//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)
//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());
//Removing Base Directory Name
files.remove(0)
//Creating a Scala List for same
files.asScala
}
Run Code Online (Sandbox Code Playgroud)
现在将此List对象传递给下面的代码段,注意:sc是SQLContext的对象
var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}
Run Code Online (Sandbox Code Playgroud)
现在你有一个最终的统一RDD即df
可选,您也可以在一个BigRDD中重新分配它
val files = sc.textFile(filename, 1).repartition(1)
Run Code Online (Sandbox Code Playgroud)
重新分区始终有效:D
| 归档时间: |
|
| 查看次数: |
137444 次 |
| 最近记录: |