Nor*_*n D 4 parallel-processing aggregate-functions dataframe apache-spark custom-function
我正在努力解决这个超级简单的问题,我已经厌倦了,我希望有人可以帮我解决这个问题.我有一个像这样的形状的数据框:
--------------------------- | Category | Product_ID | |------------+------------+ | a | product 1 | | a | product 2 | | a | product 3 | | a | product 1 | | a | product 4 | | b | product 5 | | b | product 6 | ---------------------------
如何按类别对这些行进行分组并在Scala中应用复杂的功能?也许是这样的:
val result = df.groupBy("Category").apply(myComplexFunction)
Run Code Online (Sandbox Code Playgroud)
这个myComplexFunction应该为每个类别生成下表,并上传成Hive表中的成对相似性或将其保存到HDFS中:
+--------------------------------------------------+ | | Product_1 | Product_2 | Product_3 | +------------+------------+------------------------+ | Product_1 | 1.0 | 0.1 | 0.8 | | Product_2 | 0.1 | 1.0 | 0.5 | | Product_3 | 0.8 | 0.5 | 1.0 | +--------------------------------------------------+
这是我想要应用的函数(它只是计算每个类别中的项目项余弦相似度):
def myComplexFunction(context_data : DataFrame, country_name: String,
context_id: String, table_name_correlations: String,
context_layer: String, context_index: String) : Boolean = {
val unique_identifier = country_name + "_" + context_layer + "_" + context_index
val temp_table_vocabulary = "temp_vocabulary_" + unique_identifier
val temp_table_similarities = "temp_similarities_" + unique_identifier
val temp_table_correlations = "temp_correlations_" + unique_identifier
//context.count()
// fit a CountVectorizerModel from the corpus
//println("Creating sparse incidence matrix")
val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("words").setOutputCol("features").fit(context_data)
val incidence = cvModel.transform(context_data)
// ========================================================================================
// create dataframe of mapping from indices into the item id
//println("Creating vocabulary")
val vocabulary_rdd = sc.parallelize(cvModel.vocabulary)
val rows_vocabulary_rdd = vocabulary_rdd.zipWithIndex.map{ case (s,i) => Row(s,i)}
val vocabulary_field1 = StructField("Product_ID", StringType, true)
val vocabulary_field2 = StructField("Product_Index", LongType, true)
val schema_vocabulary = StructType(Seq(vocabulary_field1, vocabulary_field2))
val df_vocabulary = hiveContext.createDataFrame(rows_vocabulary_rdd, schema_vocabulary)
// ========================================================================================
//println("Computing similarity matrix")
val myvectors = incidence.select("features").rdd.map(r => r(0).asInstanceOf[Vector])
val mat: RowMatrix = new RowMatrix(myvectors)
val sims = mat.columnSimilarities(0.0)
// ========================================================================================
// Convert records of the Matrix Entry RDD into Rows
//println("Extracting paired similarities")
val rowRdd = sims.entries.map{case MatrixEntry(i, j, v) => Row(i, j, v)}
// ========================================================================================
// create dataframe schema
//println("Creating similarity dataframe")
val field1 = StructField("Product_Index", LongType, true)
val field2 = StructField("Neighbor_Index", LongType, true)
var field3 = StructField("Similarity_Score", DoubleType, true)
val schema_similarities = StructType(Seq(field1, field2, field3))
// create the dataframe
val df_similarities = hiveContext.createDataFrame(rowRdd, schema_similarities)
// ========================================================================================
//println("Register vocabulary and correlations as spark temp tables")
df_vocabulary.registerTempTable(temp_table_vocabulary)
df_similarities.registerTempTable(temp_table_similarities)
// ========================================================================================
//println("Extracting Product_ID")
val temp_corrs = hiveContext.sql(
s"SELECT T1.Product_ID, T2.Neighbor_ID, T1.Similarity_Score " +
s"FROM " +
s"(SELECT Product_ID, Neighbor_Index, Similarity_Score " +
s"FROM $temp_table_similarities LEFT JOIN $temp_table_vocabulary " +
s"WHERE $temp_table_similarities.Product_Index = $temp_table_vocabulary.Product_Index) AS T1 " +
s"LEFT JOIN " +
s"(SELECT Product_ID AS Neighbor_ID, Product_Index as Neighbor_Index FROM $temp_table_vocabulary) AS T2 " +
s"ON " +
s"T1.Neighbor_Index = T2.Neighbor_Index")
// ========================================================================================
val context_corrs = temp_corrs.withColumn("Context_Layer", lit(context_layer)).withColumn("Context_ID", lit(context_id)).withColumn("Country", lit(country_name))
context_corrs.registerTempTable(temp_table_correlations)
// ========================================================================================
hiveContext.sql(s"INSERT INTO TABLE $table_name_correlations SELECT * FROM $temp_table_correlations")
// ========================================================================================
// clean up environment
//println("Cleaning up temp tables")
hiveContext.dropTempTable(temp_table_correlations)
hiveContext.dropTempTable(temp_table_similarities)
hiveContext.dropTempTable(temp_table_vocabulary)
return true
}
val partitioned = tokenized.repartition(tokenized("context_id"))
val context_counts = partitioned.mapPartitions()
//val context_counts = model_code_ids.zipWithIndex.map{case (model_code_id, context_index) => compute_similarity(tokenized.filter(tokenized("context_id") === model_code_id), country_name, model_code_id.asInstanceOf[String], table_name_correlations, context_layer, context_index.toString)}
}
Run Code Online (Sandbox Code Playgroud)
我已经尝试过以下内容:
val category_ids = df.select("Category").distinct.collect()
val result = category_ids.map(category_id => myComplexFunction(df.filter(df("Category") <=> category_id)))
Run Code Online (Sandbox Code Playgroud)
我不知道为什么,但这种方法是顺序运行而不是并行运行.
余弦相似度不是一个复杂的函数,可以使用标准SQL聚合表示.让我们考虑以下示例:
val df = Seq(
("feat1", 1.0, "item1"),
("feat2", 1.0, "item1"),
("feat6", 1.0, "item1"),
("feat1", 1.0, "item2"),
("feat3", 1.0, "item2"),
("feat4", 1.0, "item3"),
("feat5", 1.0, "item3"),
("feat1", 1.0, "item4"),
("feat6", 1.0, "item4")
).toDF("feature", "value", "item")
Run Code Online (Sandbox Code Playgroud)
其中feature是特征标识符,value是对应的值,item是对象标识符feature,item对只有一个对应的值.
余弦相似度定义为:
其中分子可以计算为:
val numer = df.as("this").withColumnRenamed("item", "this")
.join(df.as("other").withColumnRenamed("item", "other"), Seq("feature"))
.where($"this" < $"other")
.groupBy($"this", $"other")
.agg(sum($"this.value" * $"other.value").alias("dot"))
Run Code Online (Sandbox Code Playgroud)
和分母中使用的规范如下:
import org.apache.spark.sql.functions.sqrt
val norms = df.groupBy($"item").agg(sqrt(sum($"value" * $"value")).alias("norm"))
Run Code Online (Sandbox Code Playgroud)
//组合在一起:
val cosine = ($"dot" / ($"this_norm.norm" * $"other_norm.norm")).as("cosine")
val similarities = numer
.join(norms.alias("this_norm").withColumnRenamed("item", "this"), Seq("this"))
.join(norms.alias("other_norm").withColumnRenamed("item", "other"), Seq("other"))
.select($"this", $"other", cosine)
Run Code Online (Sandbox Code Playgroud)
结果表示上三角矩阵的非零项忽略对角线(这是微不足道的):
+-----+-----+-------------------+
| this|other| cosine|
+-----+-----+-------------------+
|item1|item4| 0.8164965809277259|
|item1|item2|0.40824829046386296|
|item2|item4| 0.4999999999999999|
+-----+-----+-------------------+
Run Code Online (Sandbox Code Playgroud)
这相当于:
import org.apache.spark.sql.functions.array
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.Vectors
val pivoted = df.groupBy("item").pivot("feature").sum()
.na.fill(0.0)
.orderBy("item")
val mat = new IndexedRowMatrix(pivoted
.select(array(pivoted.columns.tail.map(col): _*))
.rdd
.zipWithIndex
.map {
case (row, idx) =>
new IndexedRow(idx, Vectors.dense(row.getSeq[Double](0).toArray))
})
mat.toCoordinateMatrix.transpose
.toIndexedRowMatrix.columnSimilarities
.toBlockMatrix.toLocalMatrix
Run Code Online (Sandbox Code Playgroud)
0.0 0.408248290463863 0.0 0.816496580927726
0.0 0.0 0.0 0.4999999999999999
0.0 0.0 0.0 0.0
0.0 0.0 0.0 0.0
Run Code Online (Sandbox Code Playgroud)
关于你的代码:
collected)集合上运行.myComplexFunction 无法进一步分发,因为它是分布式数据结构和上下文.