相关疑难解决方法(0)

如何在Spark SQL中定义和使用用户定义的聚合函数?

我知道如何在Spark SQL中编写UDF:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)

我可以做类似的定义聚合函数吗?这是怎么做到的?

对于上下文,我想运行以下SQL查询:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)

它应该返回类似的东西

Row(span1, false, T0)

我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?

scala aggregate-functions user-defined-functions apache-spark apache-spark-sql

37
推荐指数
1
解决办法
3万
查看次数

Spark:如何使用Scala或Java用户定义函数映射Python?

比方说,我的团队选择Python作为Spark开发的参考语言.但是后来由于性能原因,我们希望开发特定的Scala或Java特定的库,以便使用我们的Python代码(类似于使用Scala或Java骨架的Python存根)进行映射.

难道您不认为是否可以将新的自定义Python方法与一些Scala或Java用户定义函数联系起来?

python java scala apache-spark pyspark

21
推荐指数
1
解决办法
1万
查看次数

将自定义函数应用于spark数据帧组

我有一个非常大的时间序列数据表,其中包含以下列:

  • 时间戳
  • LicensePlate
  • UberRide#
  • 速度

应考虑整组数据处理每个LicensePlate/UberRide数据集合.换句话说,我不需要逐行处理数据,而是将所有行按(LicensePlate/UberRide)一起分组.

我计划在数据帧api中使用spark,但我对如何在spark分组数据帧上执行自定义计算感到困惑.

我需要做的是:

  1. 获取所有数据
  2. 按一些列分组
  3. Foreach spark数据帧组应用af(x).返回自定义对象foreach组
  4. 通过应用g(x)并返回单个自定义对象来获取结果

我该怎么做第3步和第4步?我应该使用哪些关于spark API(数据帧,数据集,rdd,也许是pandas ......)的提示?

整个工作流程如下:

工作流程

group-by dataset dataframe apache-spark pyspark

6
推荐指数
2
解决办法
1万
查看次数

PySpark数据帧上的自定义聚合

我有一个PySpark DataFrame,其中一列作为一个热编码向量.我想在groupby之后通过向量加法来聚合不同的一个热编码向量

例如 df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]

我希望输出为行:["1234", [ 1 1 0]]因此向量是所有向量分组的总和userid.

我怎样才能做到这一点?PySpark sum聚合操作不支持向量加法.

aggregate-functions user-defined-functions apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
6030
查看次数

用户定义的功能要应用于PySpark中的Window?

我试图将用户定义的函数应用于PySpark中的Window。我已经读过UDAF也许是要走的路,但是我找不到任何具体的东西。

举个例子(摘自:Xinh的技术博客,并针对PySpark进行了修改):

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()

a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])

customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
                                    ["Alice", "2016-05-03", 45.00],
                                    ["Alice", "2016-05-04", 55.00],
                                    ["Bob", "2016-05-01", 25.00],
                                    ["Bob", "2016-05-04", 29.00],
                                    ["Bob", "2016-05-06", 27.00]],
                               ["name", "date", "amountSpent"])

customers.show()

window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)

result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))

result.show()
Run Code Online (Sandbox Code Playgroud)

我正在应用avg已经内置的函数,pyspark.sql.functions但是如果avg我不想使用更复杂的函数并编写自己的函数,该怎么办?

aggregate-functions user-defined-functions window-functions apache-spark pyspark

5
推荐指数
2
解决办法
4006
查看次数

PySpark SQL 中的用户定义聚合函数

如何在 PySpark SQL 中实现用户定义聚合函数 (UDAF)?

pyspark version = 3.0.2
python version = 3.7.10
Run Code Online (Sandbox Code Playgroud)

作为一个最小的示例,我想用 UDAF 替换 AVG 聚合函数:

sc = SparkContext()
sql = SQLContext(sc)
df = sql.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
rv = sql.sql('SELECT id, AVG(value) FROM df GROUP BY id').toPandas()
Run Code Online (Sandbox Code Playgroud)

其中 rv 将是:

In [2]: rv
Out[2]:
   id  avg(value)
0   1         1.5
1   2         3.5
Run Code Online (Sandbox Code Playgroud)

UDAF 如何替换AVG查询中的内容?

例如,这不起作用

import numpy as np
def udf_avg(x):
    return np.mean(x)
sql.udf.register('udf_avg', udf_avg)
rv = …
Run Code Online (Sandbox Code Playgroud)

user-defined-functions pandas apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
2949
查看次数

如何在多列上编写Pyspark UDAF?

我在pyspark数据框中有以下数据end_stats_df:

values     start    end    cat1   cat2
10          1        2      A      B
11          1        2      C      B
12          1        2      D      B
510         1        2      D      C
550         1        2      C      B
500         1        2      A      B
80          1        3      A      B
Run Code Online (Sandbox Code Playgroud)

我想以下列方式聚合它:

  • 我想使用"开始"和"结束"列作为聚合键
  • 对于每组行,我需要执行以下操作:
    • 计算该组cat1cat2该组中唯一的值数.例如,对于start= 1和end= 2 的组,该数字将是4,因为存在A,B,C,D.该数字将被存储为n(在该示例中n = 4).
    • 对于该values字段,我需要对每个组进行排序values,然后选择每个n-1值,其中n是从上面第一个操作中存储的值.
    • 在聚合结束时,我并不关心上述操作中cat1cat2之后的内容.

上例中的示例输出是:

values     start    end …
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd apache-spark-sql pyspark

4
推荐指数
1
解决办法
4010
查看次数

pyspark - 按元素聚合(求和)向量

我有一个看起来很简单的问题,但我一直用头撞墙,但没有成功。我基本上是在尝试做与这篇文章相同的事情,只是我不关心该文章的“分组依据”方面,我只想对所有行进行求和。

解释一下链接的帖子,DataFrame 看起来像:

ID,Vec
1,[0,0,5]
2,[3,3,4]
3,[0,8,1]
....
Run Code Online (Sandbox Code Playgroud)

我想按元素对向量求和。

上面示例的所需输出将是一行:

SumOfVectors
[3,11,10]
Run Code Online (Sandbox Code Playgroud)

另一个很大的区别是我使用的是 pyspark,而不是 Scala。我试着rdd.fold()开始工作,但要么工作不一样,要么我无法弄清楚 pyspark 中的语法。

最后一个警告是,我在 ~1MM 行的数据帧和长度 ~10k 的向量上执行此操作,因此这必须相当有效。

谢谢你的帮助!根据评论,下面是一个可重复的玩具数据框。

import numpy as np
from pyspark.ml.linalg import Vectors

n_rows = 100

pdf = np.concatenate([np.array(range(n_rows)), np.random.randn(n_rows), 3*np.random.randn(n_rows)+2, 6*np.random.randn(n_rows)-2]).reshape(n_rows,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), pdf)

df = spark.createDataFrame(dff,schema=["ID", "Vec"])
Run Code Online (Sandbox Code Playgroud)

df.schema 应该看起来像 StructType(List(StructField(ID,LongType,true),StructField(Vec,VectorUDT,true)))

只是打印df给了我DataFrame[ID: bigint, Vec: vector]

同样重要的是,我在 Spark 2.4

$ spark-submit --version
Welcome to …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

4
推荐指数
3
解决办法
3580
查看次数