pau*_*ult 9 apache-spark pyspark spark-dataframe pyspark-sql
考虑以下DataFrame:
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
可以使用以下代码创建:
import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
有没有一种方法可以通过对每个元素应用函数而不使用?来直接修改ArrayType()列?"names"udf
例如,假设我想将该函数foo应用于"names"列。(我将使用其中的例子foo是str.upper只用于说明目的,但我的问题是关于可以应用到一个可迭代的元素任何有效的功能。)
foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
Run Code Online (Sandbox Code Playgroud)
TypeError:列不可迭代
我可以使用udf:
foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
在这个具体的例子,我能避免udf爆炸的方式列,通话pyspark.sql.functions.upper(),然后groupBy和collect_list:
df.select('type', f.explode('names').alias('name'))\
    .withColumn('name', f.upper(f.col('name')))\
    .groupBy('type')\
    .agg(f.collect_list('name').alias('names'))\
    .show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
但这是很多代码来完成一些简单的事情。有没有更直接的方法来迭代ArrayType()使用spark-dataframe函数的元素?
在Spark <2.4中,您可以使用用户定义的函数:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType
def transform(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @udf(ArrayType(t))
    def _(xs):
        if xs is not None:
            return [f(x) for x in xs]
    return _
foo_udf = transform(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
考虑到explode+ collect_list成语的高昂成本,尽管具有固有成本,但几乎完全首选此方法。
在Spark 2.4或更高版本中,可以将transform*与upper(请参阅SPARK-23909)一起使用:
from pyspark.sql.functions import expr
df.withColumn(
    'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
也可以使用 pandas_udf 
from pyspark.sql.functions import pandas_udf, PandasUDFType
def transform_pandas(f, t=StringType()):
    if not isinstance(t, DataType):
       raise TypeError("Invalid type {}".format(type(t)))
    @pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
    return _
foo_udf_pandas = transform_pandas(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[JOHN, SAM, JANE]      |
|pet   |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
尽管只有最新的Arrow / PySpark组合支持处理ArrayType列(SPARK-24259和SPARK-21187)。尽管如此,在支持任意Python函数的同时,此选项应比标准UDF(特别是Serde开销较低)更有效。
* 还支持许多其他高阶函数,包括但不限于filter和aggregate。例如看
|   归档时间:  |  
           
  |  
        
|   查看次数:  |  
           4438 次  |  
        
|   最近记录:  |