Spark - GraphX:mapReduceTriplets 与aggregateMessages

Ste*_*ane 1 mapreduce apache-spark spark-graphx

我正在运行教程 http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html

在某些时候我们会使用mapReduceTriplets 操作。这将返回预期结果

// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](
  // For each edge send a message to the destination vertex with the attribute of the source vertex
  edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
  // To combine messages take the message for the older follower
  (a, b) => if (a._2 > b._2) a else b
)
Run Code Online (Sandbox Code Playgroud)

但 IntelliJ 指出 mapReduceTriplets 已被弃用(从 1.2.0 开始),应该由aggregateMessages 替换

// Find the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages()[(String, Int)](
  // For each edge send a message to the destination vertex with the attribute of the source vertex
  edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
  // To combine messages take the message for the older follower
  (a, b) => if (a._2 > b._2) a else b
)
Run Code Online (Sandbox Code Playgroud)

所以我运行完全相同的代码,但没有任何输出。这是预期的结果还是我应该由于aggregateMessages 的变化而改变一些东西?

Dmi*_*nov 5

也许你需要这样的东西:

val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)]
(
    // For each edge send a message to the destination vertex with the attribute of the source vertex
    sendMsg = { triplet => triplet.sendToDst(triplet.srcAttr.name, triplet.srcAttr.age) },
   // To combine messages take the message for the older follower
    mergeMsg = {(a, b) => if (a._2 > b._2) a else b}
)
Run Code Online (Sandbox Code Playgroud)

您可以在Grapx 编程指南页面找到aggregateMessages函数签名和有用的示例。希望这可以帮助。