Fra*_*oth 2 scala apache-spark
我正在尝试通过比较给定的属性来实现一个负责返回两个RDD的交集的函数.
def intersect[T](left: RDD[Article], right: RDD[Article])(by: Article => (T,Article)) = {
val a: RDD[(T, Article)] = left.map(by)
val b: RDD[(T, Article)] = right.map(by)
a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem }
}
Run Code Online (Sandbox Code Playgroud)
但是,在编译期间,sbt会抛出以下错误:
Error:(128, 7) value join is not a member of org.apache.spark.rdd.RDD[(T, org.example.Article)]
a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem }
^
Run Code Online (Sandbox Code Playgroud)
如果我硬编码类型,一切都很顺利.知道为什么我有这个错误吗?
UPDATE
scala似乎无法从RDD [(T,Article)]到PairRDDFunctions [K,V]进行隐式转换,但我不知道为什么.
UPDATE
如果我像这样修改代码:
def intersect[T](left: RDD[Article], right: RDD[Article])(by: Article => (T,Article)) = {
val a: PairRDDFunctions[T, Article] = left.map(by)
val b: RDD[(T, Article)] = right.map(by)
a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem }
}
Run Code Online (Sandbox Code Playgroud)
我收到另一个错误:
[error] No ClassTag available for T
[error] val a: PairRDDFunctions[T, Article] = left.map(by)
Run Code Online (Sandbox Code Playgroud)
最后,我设法通过使用ClassTag来解决这个问题.就像在Java中一样,类型在运行时被擦除,因此编译器无法确保RDD(T,P)可以隐式转换为另一个RDD(T,P).为了解决这个问题,我们可以使用ClassTag,它基本上是语法糖,用于在运行时保存类型信息:
def intersect[T:ClassTag](left: RDD[Article], right: RDD[Article])(by: Article => T) = {
val a: RDD[(T, Article)] = left.map(t => (by(t),t))
val b: RDD[(T, Article)] = right.map(t => (by(t),t))
a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem }
}
Run Code Online (Sandbox Code Playgroud)
我们甚至可以暗示:
implicit class RichRDD[T:ClassTag](rdd: RDD[T]) {
def intersect[P:ClassTag](that: RDD[T])(by: T => P) = {
val a: RDD[(P, T)] = rdd.map(t => (by(t),t))
val b: RDD[(P, T)] = that.map(t => (by(t),t))
a.join(b).map { case (attr, (leftItem, rightItem)) => leftItem
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1233 次 |
| 最近记录: |