Pra*_*jan 6 apache-spark apache-spark-sql hive-udf
我需要找到多个双数据类型列的中位数.请求建议找到正确的方法.
下面是我的一个列的示例数据集.我期待我的样本中值返回为1.
scala> sqlContext.sql("select num from test").show();
+---+
|num|
+---+
|0.0|
|0.0|
|1.0|
|1.0|
|1.0|
|1.0|
+---+
Run Code Online (Sandbox Code Playgroud)
我尝试了以下选项
1)Hive UDAF百分位数,它仅适用于BigInt.
2)Hive UDAT percentile_approx,但它不能按预期工作(返回0.25 vs 1).
sqlContext.sql("从test中选择percentile_approx(num,0.5)".show();
+----+
| _c0|
+----+
|0.25|
+----+
Run Code Online (Sandbox Code Playgroud)
3)Spark窗口函数percent_rank-找到中位数我看到的方法是查找高于0.5的所有percent_rank并选择max percent_rank的相应num值.但它并不适用于所有情况,特别是当我有记录计数时,在这种情况下,中位数是排序分布中的中间值的平均值.
同样在percent_rank中,因为我必须找到多列的中位数,我必须在不同的数据帧中计算它,这对我来说是一个很复杂的方法.如果我的理解不对,请纠正我.
+---+-------------+
|num|percent_rank |
+---+-------------+
|0.0|0.0|
|0.0|0.0|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
+---+---+
Run Code Online (Sandbox Code Playgroud)
出于好奇,您使用的是哪个版本的Apache Spark?Apache Spark 2.0+中有一些修复,其中包括更改approxQuantile.
如果我要运行下面的pySpark代码片段:
rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")
Run Code Online (Sandbox Code Playgroud)
与median使用计算approxQuantile为:
df.approxQuantile("num", [0.5], 0.25)
Run Code Online (Sandbox Code Playgroud)
要么
spark.sql("select percentile_approx(num, 0.5) from df").show()
Run Code Online (Sandbox Code Playgroud)
结果是:
注意,因为这些是近似数字(via approxQuantile),但一般来说这应该很好.如果您需要准确的中位数,可以使用一种方法numpy.median.下面的代码片段是df根据gench对如何使用Python Dataframe API查找Apache Spark的中位数的SO响应更新的.:
from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np
def find_median(values):
try:
median = np.median(values) #get the median of values in a list in each row
return round(float(median),2)
except Exception:
return None #if there is anything wrong with the given values
median_finder = F.udf(find_median,FloatType())
df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))
# print out
df2.show()
Run Code Online (Sandbox Code Playgroud)
输出:
+---+--------------------+------+
| id| nums|median|
+---+--------------------+------+
| 1|[0.0, 0.0, 1.0, 1...| 1.0|
+---+--------------------+------+
Run Code Online (Sandbox Code Playgroud)
如果您使用的是Spark 1.6,则可以median通过Eugene Zhulenev的响应计算使用Scala代码如何使用Apache Spark计算精确的中位数.以下是与我们的示例一起使用的修改后的代码.
import org.apache.spark.SparkContext._
val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))
val sorted = rdd.sortBy(identity).zipWithIndex().map {
case (v, idx) => (idx, v)
}
val count = sorted.count()
val median: Double = if (count % 2 == 0) {
val l = count / 2 - 1
val r = l + 1
(sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
} else sorted.lookup(count / 2).head.toDouble
Run Code Online (Sandbox Code Playgroud)
输出:
// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0
Run Code Online (Sandbox Code Playgroud)
注意,这是使用计算精确中位数RDDs- 即您需要将DataFrame列转换为RDD以执行此计算.
| 归档时间: |
|
| 查看次数: |
3070 次 |
| 最近记录: |