我知道如何在Spark SQL中编写UDF:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)
我可以做类似的定义聚合函数吗?这是怎么做到的?
对于上下文,我想运行以下SQL查询:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)
它应该返回类似的东西
Row(span1, false, T0)
我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?
scala aggregate-functions user-defined-functions apache-spark apache-spark-sql
比方说,我的团队选择Python作为Spark开发的参考语言.但是后来由于性能原因,我们希望开发特定的Scala或Java特定的库,以便使用我们的Python代码(类似于使用Scala或Java骨架的Python存根)进行映射.
难道您不认为是否可以将新的自定义Python方法与一些Scala或Java用户定义函数联系起来?
我有一个非常大的时间序列数据表,其中包含以下列:
应考虑整组数据处理每个LicensePlate/UberRide数据集合.换句话说,我不需要逐行处理数据,而是将所有行按(LicensePlate/UberRide)一起分组.
我计划在数据帧api中使用spark,但我对如何在spark分组数据帧上执行自定义计算感到困惑.
我需要做的是:
我该怎么做第3步和第4步?我应该使用哪些关于spark API(数据帧,数据集,rdd,也许是pandas ......)的提示?
整个工作流程如下:
我有一个PySpark DataFrame,其中一列作为一个热编码向量.我想在groupby之后通过向量加法来聚合不同的一个热编码向量
例如 df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]]
我希望输出为行:["1234", [ 1 1 0]]因此向量是所有向量分组的总和userid.
我怎样才能做到这一点?PySpark sum聚合操作不支持向量加法.
aggregate-functions user-defined-functions apache-spark apache-spark-sql pyspark
我试图将用户定义的函数应用于PySpark中的Window。我已经读过UDAF也许是要走的路,但是我找不到任何具体的东西。
举个例子(摘自:Xinh的技术博客,并针对PySpark进行了修改):
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
spark = SparkSession.builder.master("local").config(conf=SparkConf()).getOrCreate()
a = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"], [4, "d"], [5, "e"]], ['ind', "state"])
customers = spark.createDataFrame([["Alice", "2016-05-01", 50.00],
["Alice", "2016-05-03", 45.00],
["Alice", "2016-05-04", 55.00],
["Bob", "2016-05-01", 25.00],
["Bob", "2016-05-04", 29.00],
["Bob", "2016-05-06", 27.00]],
["name", "date", "amountSpent"])
customers.show()
window_spec = Window.partitionBy("name").orderBy("date").rowsBetween(-1, 1)
result = customers.withColumn( "movingAvg", avg(customers["amountSpent"]).over(window_spec))
result.show()
Run Code Online (Sandbox Code Playgroud)
我正在应用avg已经内置的函数,pyspark.sql.functions但是如果avg我不想使用更复杂的函数并编写自己的函数,该怎么办?
aggregate-functions user-defined-functions window-functions apache-spark pyspark
如何在 PySpark SQL 中实现用户定义聚合函数 (UDAF)?
pyspark version = 3.0.2
python version = 3.7.10
Run Code Online (Sandbox Code Playgroud)
作为一个最小的示例,我想用 UDAF 替换 AVG 聚合函数:
sc = SparkContext()
sql = SQLContext(sc)
df = sql.createDataFrame(
pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
rv = sql.sql('SELECT id, AVG(value) FROM df GROUP BY id').toPandas()
Run Code Online (Sandbox Code Playgroud)
其中 rv 将是:
In [2]: rv
Out[2]:
id avg(value)
0 1 1.5
1 2 3.5
Run Code Online (Sandbox Code Playgroud)
UDAF 如何替换AVG查询中的内容?
例如,这不起作用
import numpy as np
def udf_avg(x):
return np.mean(x)
sql.udf.register('udf_avg', udf_avg)
rv = …Run Code Online (Sandbox Code Playgroud) user-defined-functions pandas apache-spark apache-spark-sql pyspark
我在pyspark数据框中有以下数据end_stats_df:
values start end cat1 cat2
10 1 2 A B
11 1 2 C B
12 1 2 D B
510 1 2 D C
550 1 2 C B
500 1 2 A B
80 1 3 A B
Run Code Online (Sandbox Code Playgroud)
我想以下列方式聚合它:
cat1和cat2该组中唯一的值数.例如,对于start= 1和end= 2 的组,该数字将是4,因为存在A,B,C,D.该数字将被存储为n(在该示例中n = 4).values字段,我需要对每个组进行排序values,然后选择每个n-1值,其中n是从上面第一个操作中存储的值.cat1和cat2之后的内容.上例中的示例输出是:
values start end …Run Code Online (Sandbox Code Playgroud) 我有一个看起来很简单的问题,但我一直用头撞墙,但没有成功。我基本上是在尝试做与这篇文章相同的事情,只是我不关心该文章的“分组依据”方面,我只想对所有行进行求和。
解释一下链接的帖子,DataFrame 看起来像:
ID,Vec
1,[0,0,5]
2,[3,3,4]
3,[0,8,1]
....
Run Code Online (Sandbox Code Playgroud)
我想按元素对向量求和。
上面示例的所需输出将是一行:
SumOfVectors
[3,11,10]
Run Code Online (Sandbox Code Playgroud)
另一个很大的区别是我使用的是 pyspark,而不是 Scala。我试着rdd.fold()开始工作,但要么工作不一样,要么我无法弄清楚 pyspark 中的语法。
最后一个警告是,我在 ~1MM 行的数据帧和长度 ~10k 的向量上执行此操作,因此这必须相当有效。
谢谢你的帮助!根据评论,下面是一个可重复的玩具数据框。
import numpy as np
from pyspark.ml.linalg import Vectors
n_rows = 100
pdf = np.concatenate([np.array(range(n_rows)), np.random.randn(n_rows), 3*np.random.randn(n_rows)+2, 6*np.random.randn(n_rows)-2]).reshape(n_rows,-1)
dff = map(lambda x: (int(x[0]), Vectors.dense(x[1:])), pdf)
df = spark.createDataFrame(dff,schema=["ID", "Vec"])
Run Code Online (Sandbox Code Playgroud)
df.schema 应该看起来像 StructType(List(StructField(ID,LongType,true),StructField(Vec,VectorUDT,true)))
只是打印df给了我DataFrame[ID: bigint, Vec: vector]
同样重要的是,我在 Spark 2.4
$ spark-submit --version
Welcome to …Run Code Online (Sandbox Code Playgroud)