PySpark - 从另一列传递一个值作为 spark 函数的参数

Utk*_*ahu 6 apache-spark apache-spark-sql pyspark

我有一个 spark 数据框,它看起来像这样,其中 expr 是 SQL/Hive 过滤器表达式。

+-----------------------------------------+
|expr                     |var1     |var2 |
+-------------------------+---------+-----+
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 > 7                 |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 3 AND var2 >= 0   |9        |0    |
|var1 = 2 AND var2 >= 0   |9        |0    |
+-------------------------+---------+-----+
Run Code Online (Sandbox Code Playgroud)

我想将此数据框转换为下面的数据框,其中标志是在评估列“expr”中的表达式后找到的布尔值

+---------------------------------------------------+
|expr                     |var1     |var2 |flag     |
+-------------------------+---------+-----+---------+
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 > 7                 |9        |0    |  True   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 3 AND var2 >= 0   |9        |0    |     .   |
|var1 = 2 AND var2 >= 0   |9        |0    |     .   |
+-------------------------+---------+-----+---------+
Run Code Online (Sandbox Code Playgroud)

我曾尝试使用这样的 expr 函数:

df.withColumn('flag', expr(col('expr')))
Run Code Online (Sandbox Code Playgroud)

它会按预期失败,因为 expr 函数需要一个字符串作为参数。

我想到的另一个想法是制作一个 UDF 并将“expr”列的值传递给它,但这将不允许我使用 pyspark 的 expr 函数,因为 UDF 都是非火花代码。

我的方法应该是什么?请问有什么建议吗?

Dee*_*Dee 5

所以这是一个没有 UDF 的 PySpark 解决方案。在 Scala 中,我相信您可以使用具有相同逻辑的 map 或 foldleft。

exprs = df.select('expr').distinct().collect()[0][0]

for ex in exprs:
    df = df.withColumn('test', when(col('expr') == lit(ex), expr(ex)))
    
df.show()
Run Code Online (Sandbox Code Playgroud)
+--------------------+----+----+----+
|                expr|var1|var2|test|
+--------------------+----+----+----+
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 2 AND var2...|   9|   0|null|
+--------------------+----+----+----+
Run Code Online (Sandbox Code Playgroud)

我应该指出,我不明白为什么 OP 想要这样做,如果他们为问题提供了更好的上下文,我相信有更好的方法。

迭代 DF 并不是最有效的做法,但在这种情况下,它实际上工作得非常快,因为它不会迭代数据,因此 Spark 将在一个计划内实际执行它。此外,单个 collect() 仅将 20 多万个 DF 的执行时间增加了 2 秒。


更新:

我现在更好地理解了这个问题,这会更快,因为 Spark 会在将它们合并到一列之前一次计算所有过滤器。

exprs = df.select('expr').distinct().collect()[0][0]

for ex in exprs:
    df = df.withColumn('test', when(col('expr') == lit(ex), expr(ex)))
    
df.show()
Run Code Online (Sandbox Code Playgroud)
+--------------------+----+----+----+
|                expr|var1|var2|test|
+--------------------+----+----+----+
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|            var1 > 7|   9|   0|true|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 3 AND var2...|   9|   0|null|
|var1 = 2 AND var2...|   9|   0|null|
+--------------------+----+----+----+
Run Code Online (Sandbox Code Playgroud)