Max*_*Max 4 python dataframe apache-spark apache-spark-sql pyspark
如果我有Spark DataFrame包含arrays,我可以通过UDF在这些数组上使用Python List方法吗?如何将Spark DataFrame array<double>转换为Python列表?
下面是一个示例,其中包含一些UDF.我不确定为什么采取最大的工作,但采取len不.最后,我想创建一个包含原始数组列中的采样值的新列.如果您也可以提供帮助,那么也会出现关于期待两个参数,奖励积分的错误!
我有以下Spark DataFrame:
from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
import random
df = sc.parallelize([Row(name='Joe',scores=[1.0,2.0,3.0]),
Row(name='Mary', scores=[3.0]),
Row(name='Mary', scores=[4.0,7.1])]).toDF()
>>> df.show()
+----+---------------+
|name| scores|
+----+---------------+
| Joe|[1.0, 2.0, 3.0]|
|Mary| [3.0]|
|Mary| [4.0, 7.1]|
+----+---------------+
>>> df
DataFrame[name: string, scores: array<double>]
def sampleWithReplacement(listIn,samples):
tempList = array()
count=0
while (count<samples):
tempList.append(random.sample(listIn,1)[0])
count=count+1
return tempList
def maxArray(listIn):
return max(listIn)
def lenArray(listIn):
return len(listIn)
sampUDF=udf(sampleWithReplacement,ArrayType())
maxUDF=udf(maxArray,IntegerType())
lenUDF=udf(lenArray,IntegerType())
>>> df.withColumn("maxCol",maxUDF(df.scores)).show()
+----+---------------+------+
|name| scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]| null|
|Mary| [3.0]| null|
|Mary| [4.0, 7.1]| null|
+----+---------------+------+
>>> df.withColumn("maxCol",lenUDF(df.scores)).show()
+----+---------------+------+
|name| scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]| 3|
|Mary| [3.0]| 1|
|Mary| [4.0, 7.1]| 2|
+----+---------------+------+
Run Code Online (Sandbox Code Playgroud)
use*_*411 10
TL; DR当你有选择时总是更喜欢内置函数udf.计算长度使用size(别名为length)方法:
from pyspark.sql.functions import length, size
df.withColumn("len", size("scores"))
Run Code Online (Sandbox Code Playgroud)
对于小型阵列,您可以尝试
from pyspark.sql.functions import sort_array
df.withColumn("max", sort_array("scores", False)[0])
Run Code Online (Sandbox Code Playgroud)
但是对于大型系列来说,它当然不是一个好选择.
Spark DataFrame数组是否与Python列表不同?
在内部它们是不同的,因为有Scala对象.在访问时udf有简单的Python列表.出了什么问题?
我们来看看类型.scores专栏是array<double>.转换为Python类型时,会产生一个List[float].当你打电话给max你时,你得到一个float输出.
然而,您将返回类型声明为IntegerType.因为float无法转换为整数丢失的精度结果是未定义的NULL.正确选择返回类型是:DoubleType或FloatType:
maxf = udf(lambda xs: max(xs), FloatType())
maxd = udf(lambda xs: max(xs), DoubleType())
(sc
.parallelize([("Joe", [1.0, 2.0, 3.0])])
.toDF(["name", "scores"])
.select("*", maxf("scores"), maxd("scores")))
Run Code Online (Sandbox Code Playgroud)
结果:
+----+---------------+----------------+----------------+
|name| scores|<lambda>(scores)|<lambda>(scores)|
+----+---------------+----------------+----------------+
| Joe|[1.0, 2.0, 3.0]| 3.0| 3.0|
+----+---------------+----------------+----------------+
Run Code Online (Sandbox Code Playgroud)
和架构:
root
|-- name: string (nullable = true)
|-- scores: array (nullable = true)
| |-- element: double (containsNull = true)
|-- <lambda>(scores): float (nullable = true)
|-- <lambda>(scores): double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
8270 次 |
| 最近记录: |