我有一个格式为 RDD[((Long, Long), (Long, Long))] 的 RDD,我需要转换或转换为 RDD[((Long, Long), (Long, Long, Long, Long))]其中第二个 RDD 元组基于第一个 RDD 的函数。
我正在尝试实现这个基于地图的功能,但是,我认为我在这里做错了。请帮我解决这个问题。
这是完整的代码:
package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map
class ListItemCorrelation(sc: SparkContext) extends Serializable {
def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = {
if (dirX.equals(1)) {
if (dirY.equals(1)) {
return (1, 0, 0, 0)
} else {
return (0, 1, 0, 0)
}
} else {
if (dirY.equals(1)) {
return (0, 0, 1, 0)
} …
Run Code Online (Sandbox Code Playgroud)