在整行上使用udf过滤Pyspark Dataframe

use*_*973 2 user-defined-functions apache-spark-sql pyspark

有没有办法选择整个行作为列输入到Pyspark过滤器udf中?

我有一个复杂的过滤功能“ my_filter”,我想将其应用于整个DataFrame:

my_filter_udf = udf(lambda r: my_filter(r), BooleanType())
new_df = df.filter(my_filter_udf(col("*"))
Run Code Online (Sandbox Code Playgroud)

col("*")
Run Code Online (Sandbox Code Playgroud)

抛出错误,因为这不是有效的操作。

我知道我可以将数据帧转换为RDD,然后使用RDD的filter方法,但是我不想将其转换为RDD,然后再转换为数据帧。我的DataFrame具有复杂的嵌套类型,因此当我尝试再次将RDD转换为数据帧时,架构推断将失败。

ham*_*una 7

您应该静态地写所有列。例如:

from pyspark.sql import functions as F

# create sample df
df = sc.parallelize([
     (1, 'b'),
     (1, 'c'),

 ]).toDF(["id", "category"])

#simple filter function
@F.udf(returnType=BooleanType())
def my_filter(col1, col2):
    return (col1>0) & (col2=="b")

df.filter(my_filter('id', 'category')).show()
Run Code Online (Sandbox Code Playgroud)

结果:

+---+--------+
| id|category|
+---+--------+
|  1|       b|
+---+--------+
Run Code Online (Sandbox Code Playgroud)

如果您有很多列,并且您确定要按列排序:

cols = df.columns
df.filter(my_filter(*cols)).show()
Run Code Online (Sandbox Code Playgroud)

产生相同的输出。