kin*_*jou 4 scala apache-spark apache-spark-sql pyspark apache-spark-mllib
使用:
http://spark.apache.org/docs/1.6.1/mllib-frequent-pattern-mining.html
蟒蛇代码:
from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(dataframe,0.01,10)
Run Code Online (Sandbox Code Playgroud)
斯卡拉:
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val data = sc.textFile("data/mllib/sample_fpgrowth.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
}
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(
rule.antecedent.mkString("[", ",", "]")
+ " => " + rule.consequent .mkString("[", ",", "]")
+ ", " + rule.confidence)
}
Run Code Online (Sandbox Code Playgroud)
从这里的代码可以看出,scala 部分没有最低限度的信心。
def trainFPGrowthModel(
data: JavaRDD[java.lang.Iterable[Any]],
minSupport: Double,
numPartitions: Int): FPGrowthModel[Any] = {
val fpg = new FPGrowth()
.setMinSupport(minSupport)
.setNumPartitions(numPartitions)
val model = fpg.run(data.rdd.map(_.asScala.toArray))
new FPGrowthModelWrapper(model)
}
Run Code Online (Sandbox Code Playgroud)
在pyspark的情况下如何添加minConfidence来生成关联规则?我们可以看到scala有例子,而python没有例子。
火花 >= 2.2
有一个DataFrame基本mlAPI 提供AssociationRules:
from pyspark.ml.fpm import FPGrowth
data = ...
fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.
Run Code Online (Sandbox Code Playgroud)
火花 < 2.2
至于现在 PySpark 不支持提取关联规则(DataFrame基于FPGrowthPython 支持的API 正在进行中SPARK-1450)但我们可以轻松解决这个问题。
首先,您必须安装 SBT(只需转到下载页面)并按照您的操作系统的说明进行操作。
接下来,您必须创建一个只有两个文件的简单 Scala 项目:
.
??? AssociationRulesExtractor.scala
??? build.sbt
Run Code Online (Sandbox Code Playgroud)
您可以稍后调整它以遵循既定的目录结构。
接下来将以下内容添加到build.sbt(调整 Scala 版本和 Spark 版本以匹配您使用的版本):
name := "fpm"
version := "1.0"
scalaVersion := "2.10.6"
val sparkVersion = "1.6.2"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
Run Code Online (Sandbox Code Playgroud)
并遵循AssociationRulesExtractor.scala:
package com.example.fpm
import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD
object AssociationRulesExtractor {
def apply(rdd: RDD[Rule[String]]) = {
rdd.map(rule => Array(
rule.confidence, rule.javaAntecedent, rule.javaConsequent
))
}
}
Run Code Online (Sandbox Code Playgroud)
打开您选择的终端模拟器,转到项目的根目录并调用:
sbt package
Run Code Online (Sandbox Code Playgroud)
它将在目标目录中生成一个 jar 文件。例如在 Scala 2.10 中,它将是:
target/scala-2.10/fpm_2.10-1.0.jar
Run Code Online (Sandbox Code Playgroud)
启动 PySpark shell 或使用spark-submit并将路径传递到生成的 jar 文件--driver-class-path:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar
Run Code Online (Sandbox Code Playgroud)
在非本地模式下:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar
Run Code Online (Sandbox Code Playgroud)
在集群模式下,jar 应该存在于所有节点上。
添加一些方便的包装器:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar
Run Code Online (Sandbox Code Playgroud)
最后,您可以将这些助手用作函数:
generateAssociationRules(model, 0.9)
Run Code Online (Sandbox Code Playgroud)
或作为一种方法:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar
Run Code Online (Sandbox Code Playgroud)
此解决方案依赖于内部 PySpark 方法,因此不能保证它可以在版本之间移植。