相关疑难解决方法(0)

如何在Spark SQL中定义和使用用户定义的聚合函数?

我知道如何在Spark SQL中编写UDF:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)

我可以做类似的定义聚合函数吗?这是怎么做到的?

对于上下文,我想运行以下SQL查询:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)

它应该返回类似的东西

Row(span1, false, T0)

我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?

scala aggregate-functions user-defined-functions apache-spark apache-spark-sql

37
推荐指数
1
解决办法
3万
查看次数

PySpark数据帧上的自定义聚合

我有一个PySpark DataFrame,其中一列作为一个热编码向量.我想在groupby之后通过向量加法来聚合不同的一个热编码向量

例如 df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]

我希望输出为行:["1234", [ 1 1 0]]因此向量是所有向量分组的总和userid.

我怎样才能做到这一点?PySpark sum聚合操作不支持向量加法.

aggregate-functions user-defined-functions apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
6030
查看次数

如何在Spark SQL中找到分组Vector列的平均值?

RelationalGroupedDataset通过调用创建了一个instances.groupBy(instances.col("property_name")):

val x = instances.groupBy(instances.col("property_name"))
Run Code Online (Sandbox Code Playgroud)

如何组合用户定义的聚合函数来对每个组执行Statistics.colStats().mean

谢谢!

aggregate-functions user-defined-functions apache-spark apache-spark-sql apache-spark-ml

5
推荐指数
1
解决办法
2327
查看次数

具有“稀疏”向量组的数据帧通过聚合,使用 Scala 在火花中不密集向量

我有一个如下所示的 Spark 数据框,它填充了稀疏向量但不是密集向量

+---+--------+-----+-------------+
|id |catagery|index|vec          |
+---+--------+-----+-------------+
|a  |ii      |3.0  |(5,[3],[1.0])|
|a  |ll      |0.0  |(5,[0],[1.0])|
|b  |dd      |4.0  |(5,[4],[1.0])|
|b  |kk      |2.0  |(5,[2],[1.0])|
|b  |gg      |5.0  |(5,[],[])    |
|e  |hh      |1.0  |(5,[1],[1.0])|
+---+--------+-----+-------------+
Run Code Online (Sandbox Code Playgroud)

众所周知,如果我像这样尝试

val rr=result.groupBy("id").agg(sum("index")) scala> rr.show(false)

  +---+----------+                                                                
  |id |sum(index)|
  +---+----------+
  |e  |1.0       |
  |b  |11.0      |
  |a  |3.0       |
  +---+----------+
Run Code Online (Sandbox Code Playgroud)

但是如何使用“groupBy”和“agg”来求和稀疏向量?我想要这样的最终数据帧:

      +---+-------------------------+                              
      |id |   vecResult             |
      +---+-------------------------+
      |a  |(5,[0,3],[1.0,1.0])      |
      |b  |(5,[2,4,5],[1.0,1.0,1.0])|
      |e  |(5,[1],[1.0])            |
      +---+-------------------------+
Run Code Online (Sandbox Code Playgroud)

我觉得 VectorAssembler() 可以解决这个问题,但是我不知道怎么写代码,我应该使用 udf 吗?

scala group-by apache-spark spark-dataframe apache-spark-mllib

5
推荐指数
0
解决办法
604
查看次数

pyspark - 按元素聚合(求和)向量

我有一个看起来很简单的问题,但我一直用头撞墙,但没有成功。我基本上是在尝试做与这篇文章相同的事情,只是我不关心该文章的“分组依据”方面,我只想对所有行进行求和。

解释一下链接的帖子,DataFrame 看起来像:

ID,Vec
1,[0,0,5]
2,[3,3,4]
3,[0,8,1]
....
Run Code Online (Sandbox Code Playgroud)

我想按元素对向量求和。

上面示例的所需输出将是一行:

SumOfVectors
[3,11,10]
Run Code Online (Sandbox Code Playgroud)

另一个很大的区别是我使用的是 pyspark,而不是 Scala。我试着rdd.fold()开始工作,但要么工作不一样,要么我无法弄清楚 pyspark 中的语法。

最后一个警告是,我在 ~1MM 行的数据帧和长度 ~10k 的向量上执行此操作,因此这必须相当有效。

谢谢你的帮助!根据评论,下面是一个可重复的玩具数据框。

import numpy as np
from pyspark.ml.linalg import Vectors

n_rows = 100

pdf = np.concatenate([np.array(range(n_rows)), np.random.randn(n_rows), 3*np.random.randn(n_rows)+2, 6*np.random.randn(n_rows)-2]).reshape(n_rows,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), pdf)

df = spark.createDataFrame(dff,schema=["ID", "Vec"])
Run Code Online (Sandbox Code Playgroud)

df.schema 应该看起来像 StructType(List(StructField(ID,LongType,true),StructField(Vec,VectorUDT,true)))

只是打印df给了我DataFrame[ID: bigint, Vec: vector]

同样重要的是,我在 Spark 2.4

$ spark-submit --version
Welcome to …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

4
推荐指数
3
解决办法
3580
查看次数

使用ArrayType的Spark UDAF作为bufferSchema性能问题

我正在研究一个返回元素数组的UDAF.

每次更新的输入都是索引和值的元组.

UDAF的作用是对同一索引下的所有值求和.

例:

对于输入(索引,值):( 2,1),(3,1),(2,3)

应该返回(0,0,4,1,...,0)

逻辑工作正常,但我有更新方法的问题,我的实现只更新每行1个单元格,但该方法中的最后一个赋值实际上复制整个数组 - 这是多余的,非常耗时.

仅此分配负责我的查询执行时间的98%.

我的问题是,我怎样才能缩短那段时间?是否可以在缓冲区数组中分配1个值而无需替换整个缓冲区?

PS:我正在使用Spark 1.6,我不能很快升级它,所以请坚持使用适用于此版本的解决方案.

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = …
Run Code Online (Sandbox Code Playgroud)

performance scala user-defined-functions apache-spark apache-spark-sql

3
推荐指数
1
解决办法
1818
查看次数