PySpark - Spark DataFrame数组是否与Python列表不同?

Max*_*Max 4 python dataframe apache-spark apache-spark-sql pyspark

如果我有Spark DataFrame包含arrays,我可以通过UDF在这些数组上使用Python List方法吗?如何将Spark DataFrame array<double>转换为Python列表?

下面是一个示例,其中包含一些UDF.我不确定为什么采取最大的工作,但采取len不.最后,我想创建一个包含原始数组列中的采样值的新列.如果您也可以提供帮助,那么也会出现关于期待两个参数,奖励积分的错误!

我有以下Spark DataFrame:

from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import ArrayType
import random

df = sc.parallelize([Row(name='Joe',scores=[1.0,2.0,3.0]),
Row(name='Mary', scores=[3.0]),
Row(name='Mary', scores=[4.0,7.1])]).toDF()
>>> df.show()
+----+---------------+
|name|         scores|
+----+---------------+
| Joe|[1.0, 2.0, 3.0]|
|Mary|          [3.0]|
|Mary|     [4.0, 7.1]|
+----+---------------+
>>> df
DataFrame[name: string, scores: array<double>]
def sampleWithReplacement(listIn,samples):
    tempList = array()
    count=0
    while (count<samples):
        tempList.append(random.sample(listIn,1)[0])
        count=count+1
    return tempList

def maxArray(listIn):
    return max(listIn)

def lenArray(listIn):
    return len(listIn)
sampUDF=udf(sampleWithReplacement,ArrayType())
maxUDF=udf(maxArray,IntegerType())
lenUDF=udf(lenArray,IntegerType())

>>> df.withColumn("maxCol",maxUDF(df.scores)).show()
+----+---------------+------+
|name|         scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]|  null|
|Mary|          [3.0]|  null|
|Mary|     [4.0, 7.1]|  null|
+----+---------------+------+

>>> df.withColumn("maxCol",lenUDF(df.scores)).show()
+----+---------------+------+
|name|         scores|maxCol|
+----+---------------+------+
| Joe|[1.0, 2.0, 3.0]|     3|
|Mary|          [3.0]|     1|
|Mary|     [4.0, 7.1]|     2|
+----+---------------+------+
Run Code Online (Sandbox Code Playgroud)

use*_*411 10

TL; DR当你有选择时总是更喜欢内置函数udf.计算长度使用size(别名为length)方法:

from pyspark.sql.functions import length, size 

df.withColumn("len", size("scores"))
Run Code Online (Sandbox Code Playgroud)

对于小型阵列,您可以尝试

from pyspark.sql.functions import sort_array

df.withColumn("max", sort_array("scores", False)[0])
Run Code Online (Sandbox Code Playgroud)

但是对于大型系列来说,它当然不是一个好选择.

Spark DataFrame数组是否与Python列表不同?

在内部它们是不同的,因为有Scala对象.在访问时udf有简单的Python列表.出了什么问题?

我们来看看类型.scores专栏是array<double>.转换为Python类型时,会产生一个List[float].当你打电话给max你时,你得到一个float输出.

然而,您将返回类型声明为IntegerType.因为float无法转换为整数丢失的精度结果是未定义的NULL.正确选择返回类型是:DoubleTypeFloatType:

maxf = udf(lambda xs: max(xs), FloatType())
maxd = udf(lambda xs: max(xs), DoubleType())

(sc
    .parallelize([("Joe", [1.0, 2.0, 3.0])])
    .toDF(["name", "scores"])
    .select("*", maxf("scores"), maxd("scores")))
Run Code Online (Sandbox Code Playgroud)

结果:

+----+---------------+----------------+----------------+
|name|         scores|<lambda>(scores)|<lambda>(scores)|
+----+---------------+----------------+----------------+
| Joe|[1.0, 2.0, 3.0]|             3.0|             3.0|
+----+---------------+----------------+----------------+
Run Code Online (Sandbox Code Playgroud)

和架构:

root
 |-- name: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- <lambda>(scores): float (nullable = true)
 |-- <lambda>(scores): double (nullable = true)
Run Code Online (Sandbox Code Playgroud)