如何在apache spark(scala)中迭代RDD

Hav*_*nar 21 scala apache-spark

我使用以下命令用一堆包含2个字符串["filename","content"]的数组填充RDD.

现在,我希望迭代每个事件,以便对每个文件名和内容执行某些操作.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
Run Code Online (Sandbox Code Playgroud)

我似乎无法找到有关如何执行此操作的任何文档.

所以我想要的是:

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 
Run Code Online (Sandbox Code Playgroud)

Spi*_*lov 26

您可以在RDD上调用接受函数作为参数的各种方法.

// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)

// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))

// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)

// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))

// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)

// Print each remaining array.
bigRDD.collect().foreach(a => {
    a.foreach(e => print(e + " "))
    println()
  })
}
Run Code Online (Sandbox Code Playgroud)

请注意,您编写的函数接受单个RDD元素作为输入,并返回某种统一类型的数据,因此您创建后一种类型的RDD.例如,countRDD是一个RDD[Int],而bigRDD仍然是一个RDD[Array[Int]].

在某些时候写一个foreach修改一些其他数据可能很诱人,但你应该抵制这个问题和答案中描述的原因.

编辑:不要尝试打印大RDDs

一些读者询问了如何使用collect()println()查看他们的结果,如上例所示.当然,这只适用于你在Spark REPL(read-eval-print-loop)之类的交互模式下运行.最好调用collect()RDD来获得顺序打印的顺序数组.但是collect()可能会带回太多数据,无论如何都可能会打印太多数据.RDD如果它们很大,可以使用以下方法深入了解您的s:

  1. RDD.take():这可以很好地控制你获得的元素数量,但不能控制它们的来源 - 定义为"第一"元素,这是一个概念,由其他各种问题和答案处理.

    // take() returns an Array so no need to collect()
    myHugeRDD.take(20).foreach(a => println(a))
    
    Run Code Online (Sandbox Code Playgroud)
  2. RDD.sample():这可以让你(粗略地)控制你得到的结果部分,无论采样是否使用替换,甚至是可选的随机数种子.

    // sample() does return an RDD so you may still want to collect()
    myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
    
    Run Code Online (Sandbox Code Playgroud)
  3. RDD.takeSample():这是混合:使用您可以控制的随机抽样,但两者都允许您指定结果的确切数量并返回Array.

    // takeSample() returns an Array so no need to collect() 
    myHugeRDD.takeSample(true, 20).foreach(a => println(a))
    
    Run Code Online (Sandbox Code Playgroud)
  4. RDD.count():有时候最好的洞察力来自你最终获得了多少元素 - 我经常这样做.

    println(myHugeRDD.count())       
    
    Run Code Online (Sandbox Code Playgroud)


Dav*_*vid 9

Spark中的基本操作是mapfilter.

val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
Run Code Online (Sandbox Code Playgroud)

txtRDD意志现在只包含有扩展名为".txt"文件

如果你想对这些文件进行字数统计,你可以说

//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))  
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)

您希望在mapPartitions需要执行一些昂贵的初始化时使用- 例如,如果您想使用像Stanford coreNLP工具这样的库执行命名实体识别.

硕士map,filter,flatMap,和reduce,你是用自己的方式来掌握的火花.