我知道如何在Spark SQL中编写UDF:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)
我可以做类似的定义聚合函数吗?这是怎么做到的?
对于上下文,我想运行以下SQL查询:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)
它应该返回类似的东西
Row(span1, false, T0)
我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?
scala aggregate-functions user-defined-functions apache-spark apache-spark-sql
我有一个PySpark DataFrame,其中一列作为一个热编码向量.我想在groupby之后通过向量加法来聚合不同的一个热编码向量
例如 df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]
我希望输出为行:["1234", [ 1 1 0]]因此向量是所有向量分组的总和userid.
我怎样才能做到这一点?PySpark sum聚合操作不支持向量加法.
aggregate-functions user-defined-functions apache-spark apache-spark-sql pyspark
我RelationalGroupedDataset通过调用创建了一个instances.groupBy(instances.col("property_name")):
val x = instances.groupBy(instances.col("property_name"))
Run Code Online (Sandbox Code Playgroud)
如何组合用户定义的聚合函数来对每个组执行Statistics.colStats().mean?
谢谢!
aggregate-functions user-defined-functions apache-spark apache-spark-sql apache-spark-ml
我有一个如下所示的 Spark 数据框,它填充了稀疏向量但不是密集向量:
+---+--------+-----+-------------+
|id |catagery|index|vec |
+---+--------+-----+-------------+
|a |ii |3.0 |(5,[3],[1.0])|
|a |ll |0.0 |(5,[0],[1.0])|
|b |dd |4.0 |(5,[4],[1.0])|
|b |kk |2.0 |(5,[2],[1.0])|
|b |gg |5.0 |(5,[],[]) |
|e |hh |1.0 |(5,[1],[1.0])|
+---+--------+-----+-------------+
Run Code Online (Sandbox Code Playgroud)
众所周知,如果我像这样尝试
val rr=result.groupBy("id").agg(sum("index"))
scala> rr.show(false)
+---+----------+
|id |sum(index)|
+---+----------+
|e |1.0 |
|b |11.0 |
|a |3.0 |
+---+----------+
Run Code Online (Sandbox Code Playgroud)
但是如何使用“groupBy”和“agg”来求和稀疏向量?我想要这样的最终数据帧:
+---+-------------------------+
|id | vecResult |
+---+-------------------------+
|a |(5,[0,3],[1.0,1.0]) |
|b |(5,[2,4,5],[1.0,1.0,1.0])|
|e |(5,[1],[1.0]) |
+---+-------------------------+
Run Code Online (Sandbox Code Playgroud)
我觉得 VectorAssembler() 可以解决这个问题,但是我不知道怎么写代码,我应该使用 udf 吗?
scala group-by apache-spark spark-dataframe apache-spark-mllib
我有一个看起来很简单的问题,但我一直用头撞墙,但没有成功。我基本上是在尝试做与这篇文章相同的事情,只是我不关心该文章的“分组依据”方面,我只想对所有行进行求和。
解释一下链接的帖子,DataFrame 看起来像:
ID,Vec
1,[0,0,5]
2,[3,3,4]
3,[0,8,1]
....
Run Code Online (Sandbox Code Playgroud)
我想按元素对向量求和。
上面示例的所需输出将是一行:
SumOfVectors
[3,11,10]
Run Code Online (Sandbox Code Playgroud)
另一个很大的区别是我使用的是 pyspark,而不是 Scala。我试着rdd.fold()开始工作,但要么工作不一样,要么我无法弄清楚 pyspark 中的语法。
最后一个警告是,我在 ~1MM 行的数据帧和长度 ~10k 的向量上执行此操作,因此这必须相当有效。
谢谢你的帮助!根据评论,下面是一个可重复的玩具数据框。
import numpy as np
from pyspark.ml.linalg import Vectors
n_rows = 100
pdf = np.concatenate([np.array(range(n_rows)), np.random.randn(n_rows), 3*np.random.randn(n_rows)+2, 6*np.random.randn(n_rows)-2]).reshape(n_rows,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), pdf)
df = spark.createDataFrame(dff,schema=["ID", "Vec"])
Run Code Online (Sandbox Code Playgroud)
df.schema 应该看起来像 StructType(List(StructField(ID,LongType,true),StructField(Vec,VectorUDT,true)))
只是打印df给了我DataFrame[ID: bigint, Vec: vector]
同样重要的是,我在 Spark 2.4
$ spark-submit --version
Welcome to …Run Code Online (Sandbox Code Playgroud) 我正在研究一个返回元素数组的UDAF.
每次更新的输入都是索引和值的元组.
UDAF的作用是对同一索引下的所有值求和.
例:
对于输入(索引,值):( 2,1),(3,1),(2,3)
应该返回(0,0,4,1,...,0)
逻辑工作正常,但我有更新方法的问题,我的实现只更新每行1个单元格,但该方法中的最后一个赋值实际上复制整个数组 - 这是多余的,非常耗时.
仅此分配负责我的查询执行时间的98%.
我的问题是,我怎样才能缩短那段时间?是否可以在缓冲区数组中分配1个值而无需替换整个缓冲区?
PS:我正在使用Spark 1.6,我不能很快升级它,所以请坚持使用适用于此版本的解决方案.
class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{
val bucketSize = 1000
def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)
def dataType: DataType = ArrayType(LongType)
def deterministic: Boolean = true
def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = …Run Code Online (Sandbox Code Playgroud) performance scala user-defined-functions apache-spark apache-spark-sql