Spark:计数共现 - 用于对大型集合进行有效多次通过过滤的算法

zor*_*ork 7 algorithm filtering scala group-by apache-spark

有两列的表booksreaders这些书籍,其中booksreaders分别是图书和阅读器的ID,:

   books readers
1:     1      30
2:     2      10
3:     3      20
4:     1      20
5:     1      10
6:     2      30
Run Code Online (Sandbox Code Playgroud)

记录book = 1, reader = 30表示id = 1用户已阅读该书id = 30.对于每本书对,我需要计算阅读这两本书的读者数量,使用此算法:

for each book
  for each reader of the book
    for each other_book in books of the reader
      increment common_reader_count ((book, other_book), cnt)
Run Code Online (Sandbox Code Playgroud)

使用该算法的优点是,与将所有书籍组合计数为2相比,它需要少量操作.

为了实现上述算法,我将这些数据组织成两组:1)用书键入,包含每本书的读者的RDD和2)由读者键入的RDD,包含由每个读者读取的书籍的RDD,例如在以下程序中:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object Small {

  case class Book(book: Int, reader: Int)
  case class BookPair(book1: Int, book2: Int, cnt:Int)

  val recs = Array(
    Book(book = 1, reader = 30),
    Book(book = 2, reader = 10),
    Book(book = 3, reader = 20),
    Book(book = 1, reader = 20),
    Book(book = 1, reader = 10),
    Book(book = 2, reader = 30))

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setAppName("Test")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val data = sc.parallelize(recs)

    val bookMap = data.map(r => (r.book, r))
    val bookGrps = bookMap.groupByKey

    val readerMap = data.map(r => (r.reader, r))
    val readerGrps = readerMap.groupByKey

    // *** Calculate book pairs
    // Iterate book groups 
    val allBookPairs = bookGrps.map(bookGrp => bookGrp match {
      case (book, recIter) =>
        // Iterate user groups 
        recIter.toList.map(rec => {
          // Find readers for this book
          val aReader = rec.reader
          // Find all books (including this one) that this reader read
          val allReaderBooks = readerGrps.filter(readerGrp => readerGrp match {
            case (reader2, recIter2) => reader2 == aReader
          })
          val bookPairs = allReaderBooks.map(readerTuple => readerTuple match {
            case (reader3, recIter3) => recIter3.toList.map(rec => ((book, rec.book), 1))
          })
          bookPairs
        })

    })
    val x = allBookPairs.flatMap(identity)
    val y = x.map(rdd => rdd.first)
    val z = y.flatMap(identity)
    val p = z.reduceByKey((cnt1, cnt2) => cnt1 + cnt2)
    val result = p.map(bookPair => bookPair match {
      case((book1, book2),cnt) => BookPair(book1, book2, cnt)
    } )

    val resultCsv = result.map(pair => resultToStr(pair))
    resultCsv.saveAsTextFile("./result.csv")
  }

   def resultToStr(pair: BookPair): String = {
     val sep = "|"
    pair.book1 + sep + pair.book2 + sep + pair.cnt
  }
}
Run Code Online (Sandbox Code Playgroud)

这种实现实际上导致了不同的低效算法!:

for each book
  find each reader of the book scanning all readers every time!
    for each other_book in books of the reader
      increment common_reader_count ((book, other_book), cnt)
Run Code Online (Sandbox Code Playgroud)

这与上面讨论的算法的主要目标相矛盾,因为它不是减少,而是增加了操作的数量.查找用户书籍需要过滤每本书的所有用户.因此操作次数~N*M其中N - 用户数和M - 书数.

问题:

  1. 有没有办法在Spark中实现原始算法而不为每本书过滤完整的阅读器集合?
  2. 任何其他计算书籍对的算法都能有效计算?
  3. 此外,当实际运行此代码时,我得到了filter exception我无法弄清楚的原因.有任何想法吗?

请看下面的例外日志:

15/05/29 18:24:05 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/05/29 18:24:05 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/29 18:24:09 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/29 18:24:10 INFO Remoting: Starting remoting
15/05/29 18:24:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.2.15:38910]
15/05/29 18:24:10 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@10.0.2.15:38910]
15/05/29 18:24:12 ERROR executor.Executor: Exception in task 0.0 in stage 6.0 (TID 4)
java.lang.NullPointerException
    at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
    at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:58)
    at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:54)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at Small$$anonfun$4.apply(Small.scala:54)
    at Small$$anonfun$4.apply(Small.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
    at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Run Code Online (Sandbox Code Playgroud)

更新:

这段代码:

val df = sc.parallelize(Array((1,30),(2,10),(3,20),(1,10)(2,30))).toDF("books","readers")
val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"), 
$"readers" === $"r_readers" and $"books" < $"r_books"
)
.groupBy($"books", $"r_books")
.agg($"books", $"r_books", count($"readers"))
Run Code Online (Sandbox Code Playgroud)

给出以下结果:

books r_books COUNT(readers)
1     2       2     
Run Code Online (Sandbox Code Playgroud)

所以COUNT这里有两本书(这里是1和2)被一起阅读(对数).

Dav*_*fin 9

如果将原始RDD转换为DataFrame,这种事情会容易得多:

val df = sc.parallelize(
  Array((1,30),(2,10),(3,20),(1,10), (2,30))
).toDF("books","readers")
Run Code Online (Sandbox Code Playgroud)

完成后,只需在DataFrame上自行加入以制作图书对,然后计算读取每个图书对的读者数量:

val results = df.join(
  df.select($"books" as "r_books", $"readers" as "r_readers"), 
  $"readers" === $"r_readers" and $"books" < $"r_books"
).groupBy(
  $"books", $"r_books"
).agg(
  $"books", $"r_books", count($"readers")
)
Run Code Online (Sandbox Code Playgroud)

至于有关该联接的其他说明,请注意我正在df重新加入自身 - 自我加入:df.join(df.select(...), ...).你要做的是将书#1 $"books"- 与第二本书 - $"r_books"从同一个读者 - 拼接在一起$"reader" === $"r_reader".但是,如果你只加入$"reader" === $"r_reader",你会得到同样的书加入自己.相反,我$"books" < $"r_books"用来确保书对中的排序总是如此(<lower_id>,<higher_id>).

完成连接后,您将获得一个DataFrame,其中包含每个图书对的每个读者的行.这些groupByagg函数实际计算每本书配对的读者数量.

顺便提一下,如果读者两次读同一本书,我相信你最终会重复计算,这可能是也可能不是你想要的.如果这不是你想要的只是改变count($"readers")countDistinct($"readers").

如果你想了解更多关于agg函数count()countDistinct()其他一些有趣的东西,请查看org.apache.spark.sql.functions的scaladoc.