Hav*_*nar 21 scala apache-spark
我使用以下命令用一堆包含2个字符串["filename","content"]的数组填充RDD.
现在,我希望迭代每个事件,以便对每个文件名和内容执行某些操作.
val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
Run Code Online (Sandbox Code Playgroud)
我似乎无法找到有关如何执行此操作的任何文档.
所以我想要的是:
foreach occurrence-in-the-rdd{
//do stuff with the array found on loccation n of the RDD
}
Run Code Online (Sandbox Code Playgroud)
Spi*_*lov 26
您可以在RDD上调用接受函数作为参数的各种方法.
// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)
// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))
// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)
// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))
// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)
// Print each remaining array.
bigRDD.collect().foreach(a => {
a.foreach(e => print(e + " "))
println()
})
}
Run Code Online (Sandbox Code Playgroud)
请注意,您编写的函数接受单个RDD元素作为输入,并返回某种统一类型的数据,因此您创建后一种类型的RDD.例如,countRDD
是一个RDD[Int]
,而bigRDD
仍然是一个RDD[Array[Int]]
.
在某些时候写一个foreach
修改一些其他数据可能很诱人,但你应该抵制这个问题和答案中描述的原因.
编辑:不要尝试打印大RDD
s
一些读者询问了如何使用collect()
和println()
查看他们的结果,如上例所示.当然,这只适用于你在Spark REPL(read-eval-print-loop)之类的交互模式下运行.最好调用collect()
RDD来获得顺序打印的顺序数组.但是collect()
可能会带回太多数据,无论如何都可能会打印太多数据.RDD
如果它们很大,可以使用以下方法深入了解您的s:
RDD.take()
:这可以很好地控制你获得的元素数量,但不能控制它们的来源 - 定义为"第一"元素,这是一个概念,由其他各种问题和答案处理.
// take() returns an Array so no need to collect()
myHugeRDD.take(20).foreach(a => println(a))
Run Code Online (Sandbox Code Playgroud)RDD.sample()
:这可以让你(粗略地)控制你得到的结果部分,无论采样是否使用替换,甚至是可选的随机数种子.
// sample() does return an RDD so you may still want to collect()
myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
Run Code Online (Sandbox Code Playgroud)RDD.takeSample()
:这是混合:使用您可以控制的随机抽样,但两者都允许您指定结果的确切数量并返回Array
.
// takeSample() returns an Array so no need to collect()
myHugeRDD.takeSample(true, 20).foreach(a => println(a))
Run Code Online (Sandbox Code Playgroud)RDD.count()
:有时候最好的洞察力来自你最终获得了多少元素 - 我经常这样做.
println(myHugeRDD.count())
Run Code Online (Sandbox Code Playgroud)Spark中的基本操作是map
和filter
.
val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
Run Code Online (Sandbox Code Playgroud)
的txtRDD
意志现在只包含有扩展名为".txt"文件
如果你想对这些文件进行字数统计,你可以说
//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)
Run Code Online (Sandbox Code Playgroud)
您希望在mapPartitions
需要执行一些昂贵的初始化时使用- 例如,如果您想使用像Stanford coreNLP工具这样的库执行命名实体识别.
硕士map
,filter
,flatMap
,和reduce
,你是用自己的方式来掌握的火花.
归档时间: |
|
查看次数: |
74374 次 |
最近记录: |