Spark:使用通用RDD的连接方法

Fra*_*oth 2 scala apache-spark

我正在尝试通过比较给定的属性来实现一个负责返回两个RDD的交集的函数.

  def intersect[T](left: RDD[Article], right: RDD[Article])(by: Article => (T,Article)) = {
    val a: RDD[(T, Article)] = left.map(by)
    val b: RDD[(T, Article)] = right.map(by)
    a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem }
  }
Run Code Online (Sandbox Code Playgroud)

但是,在编译期间,sbt会抛出以下错误:

Error:(128, 7) value join is not a member of org.apache.spark.rdd.RDD[(T, org.example.Article)]
    a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem }
      ^
Run Code Online (Sandbox Code Playgroud)

如果我硬编码类型,一切都很顺利.知道为什么我有这个错误吗?

UPDATE

scala似乎无法从RDD [(T,Article)]到PairRDDFunctions [K,V]进行隐式转换,但我不知道为什么.

UPDATE

如果我像这样修改代码:

  def intersect[T](left: RDD[Article], right: RDD[Article])(by: Article => (T,Article)) = {
    val a: PairRDDFunctions[T, Article] = left.map(by)
    val b: RDD[(T, Article)] = right.map(by)
    a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem }
  }
Run Code Online (Sandbox Code Playgroud)

我收到另一个错误:

[error]  No ClassTag available for T
[error]     val a: PairRDDFunctions[T, Article] = left.map(by)
Run Code Online (Sandbox Code Playgroud)

Fra*_*oth 5

最后,我设法通过使用ClassTag来解决这个问题.就像在Java中一样,类型在运行时被擦除,因此编译器无法确保RDD(T,P)可以隐式转换为另一个RDD(T,P).为了解决这个问题,我们可以使用ClassTag,它基本上是语法糖,用于在运行时保存类型信息:

  def intersect[T:ClassTag](left: RDD[Article], right: RDD[Article])(by: Article => T) = {
    val a: RDD[(T, Article)] = left.map(t => (by(t),t))
    val b: RDD[(T, Article)] = right.map(t => (by(t),t))
    a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem }
  }
Run Code Online (Sandbox Code Playgroud)

我们甚至可以暗示:

implicit class RichRDD[T:ClassTag](rdd: RDD[T]) {
    def intersect[P:ClassTag](that: RDD[T])(by: T => P) = {
        val a: RDD[(P, T)] = rdd.map(t => (by(t),t))
        val b: RDD[(P, T)] = that.map(t => (by(t),t))
        a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem 
    }
}
Run Code Online (Sandbox Code Playgroud)