在 Spark 中计算逐点互信息

Del*_*lip 5 apache-spark apache-spark-mllib

我正在尝试计算逐点互信息(PMI)。

在此处输入图片说明

我在这里分别为 p(x, y) 和 p(x) 定义了两个 RDD:

pii: RDD[((String, String), Double)]
 pi: RDD[(String, Double)]
Run Code Online (Sandbox Code Playgroud)

任何我写来计算PMI从RDDS代码piipi不漂亮。我的方法是首先压平 RDDpiipi在按摩元组元素时加入两次。

pii: RDD[((String, String), Double)]
 pi: RDD[(String, Double)]
Run Code Online (Sandbox Code Playgroud)

显然,这很糟糕。有没有更好的(惯用的)方法来做到这一点?注意:我可以通过存储 log-probs 来优化日志pipii但选择以这种方式编写以保持问题清晰。

eme*_*eth 5

使用broadcast将是一个解决方案。

val bcPi = pi.context.broadcast(pi.collectAsMap())
val pmi = pii.map {
  case ((x, y), pxy) =>
    (x, y) -> computePMI(pxy, bcPi.value.get(x).get, bcPi.value.get(y).get)
}
Run Code Online (Sandbox Code Playgroud)

假设:pihas allxyin pii