cra*_*man 5 scala apache-spark
我正在学习 Spark(在 Scala 中),并一直试图弄清楚如何计算文件每一行上的所有单词。我正在使用一个数据集,其中每行包含一个制表符分隔的 document_id 和文档的全文
doc_1 <full-text>
doc_2 <full-text>
etc..
Run Code Online (Sandbox Code Playgroud)
这是我在名为 doc.txt 的文件中的玩具示例
doc_1 new york city new york state
doc_2 rain rain go away
Run Code Online (Sandbox Code Playgroud)
我认为我需要做的是转换成包含ig的元组
((doc_id, word), 1)
Run Code Online (Sandbox Code Playgroud)
然后调用reduceByKey()对1进行求和。我写了以下内容:
val file = sc.textFile("docs.txt")
val tuples = file.map(_.split("\t"))
.map( x => (x(1).split("\\s+")
.map(y => ((x(0), y), 1 )) ) )
Run Code Online (Sandbox Code Playgroud)
这确实给了我我认为我需要的中间表示:
tuples.collect
res0: Array[Array[((String, String), Int)]] = Array(Array(((doc_1,new),1), ((doc_1,york),1), ((doc_1,city),1), ((doc_1,new),1), ((doc_1,york),1), ((doc_1,state),1)), Array(((doc_2,rain),1), ((doc_2,rain),1), ((doc_2,go),1), ((doc_2,away),1)))
Run Code Online (Sandbox Code Playgroud)
但是如果在元组上调用reduceByKey则会产生错误
tuples.reduceByKey(_ + )
<console>:21: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[Array[((String, String), Int)]]
tuples.reduceByKey(_ + )
Run Code Online (Sandbox Code Playgroud)
我似乎不知道如何做到这一点。我想我需要对数组内的数组进行减少。我尝试了很多不同的事情,但像上面的那样不断出现错误并且没有取得任何进展。对此的任何指导/建议将不胜感激。
注意:我知道https://spark.apache.org/examples.html上有一个字数统计示例,展示了如何获取文件中所有单词的计数。但这是针对整个输入文件的。我说的是获取每个文档的计数,其中每个文档位于不同的行。
reduceByKey需要类型,RDD[(K,V)]而当您split在第一个中执行 时map,您最终会得到一个RDD[Array[...]],这不是所需的类型签名。您可以按如下方式重新设计当前的解决方案...但它可能不会那么高效(在使用重新设计的代码后阅读flatMap):
//Dummy data load
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Split the data on tabs to get an array of (key, line) tuples
val firstPass = file.map(_.split("\t"))
//Split the line inside each tuple so you now have an array of (key, Array(...))
//Where the inner array is full of (word, 1) tuples
val secondPass = firstPass.map(x=>(x(0), x(1).split("\\s+").map(y=>(y,1))))
//Now group the words and re-map so that the inner tuple is the wordcount
val finalPass = secondPass.map(x=>(x._1, x._2.groupBy(_._1).map(y=>(y._1,y._2.size))))
Run Code Online (Sandbox Code Playgroud)
可能是更好的解决方案 vvvv :
如果你想保留当前的结构,那么你需要更改为Tuple2从一开始就使用 a,然后使用 a flatMapafter:
//Load your data
val file = sc.parallelize(List("doc_1\tnew york city","doc_2\train rain go away"))
//Turn the data into a key-value RDD (I suggest caching the split, kept 1 line for SO)
val firstPass = file.map(x=>(x.split("\t")(0), x.split("\t")(1)))
//Change your key to be a Tuple2[String,String] and the value is the count
val tuples = firstPass.flatMap(x=>x._2.split("\\s+").map(y=>((x._1, y), 1)))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6088 次 |
| 最近记录: |