如何在Spark/Scala中使用频率计数的文本文件创建一个二元组?

osc*_*arm 5 scala n-gram apache-spark

我想要一个文本文件并创建一个没有用"."分隔的所有单词的二元组,删除任何特殊字符.我正在尝试使用Spark和Scala来做到这一点.

本文:

朋友你好.怎么是
你今天?再见,我的朋友.

应该产生以下内容:

你好,
我的朋友,2
你怎么样,1
你今天,1
今天再见,1
再见我,1

ohr*_*uus 8

对于RDD中的每一行,首先根据分割开始'.'.然后通过拆分对每个生成的子串进行标记' '.标记化后,删除特殊字符replaceAll并转换为小写.这些子列表中的每一个都可以转换sliding为包含bigrams的字符串数组的迭代器.

然后,在mkString按照要求展平并将双字母组数组转换为字符串之后,用groupBy和计算每个数组的计数mapValues.

最后压平,减少并收集RDD中的(二元组,计数)元组.

val rdd = sc.parallelize(Array("Hello my Friend. How are",
                               "you today? bye my friend."))

rdd.map{ 

    // Split each line into substrings by periods
    _.split('.').map{ substrings =>

        // Trim substrings and then tokenize on spaces
        substrings.trim.split(' ').

        // Remove non-alphanumeric characters, using Shyamendra's
        // clean replacement technique, and convert to lowercase
        map{_.replaceAll("""\W""", "").toLowerCase()}.

        // Find bigrams
        sliding(2)
    }.

    // Flatten, and map the bigrams to concatenated strings
    flatMap{identity}.map{_.mkString(" ")}.

    // Group the bigrams and count their frequency
    groupBy{identity}.mapValues{_.size}

}.

// Reduce to get a global count, then collect
flatMap{identity}.reduceByKey(_+_).collect.

// Format and print
foreach{x=> println(x._1 + ", " + x._2)}

you today, 1
hello my, 1
my friend, 2
how are, 1
bye my, 1
today bye, 1    
Run Code Online (Sandbox Code Playgroud)