Utk*_*ahu 6 apache-spark apache-spark-sql pyspark
我有一个 spark 数据框,它看起来像这样,其中 expr 是 SQL/Hive 过滤器表达式。
+-----------------------------------------+
|expr |var1 |var2 |
+-------------------------+---------+-----+
|var1 > 7 |9 |0 |
|var1 > 7 |9 |0 |
|var1 > 7 |9 |0 |
|var1 > 7 |9 |0 |
|var1 = 3 AND var2 >= 0 |9 |0 |
|var1 = 3 AND var2 >= 0 |9 |0 |
|var1 = 3 AND var2 >= 0 |9 |0 |
|var1 = 3 AND var2 >= 0 |9 |0 |
|var1 = 2 AND var2 >= 0 |9 |0 |
+-------------------------+---------+-----+
Run Code Online (Sandbox Code Playgroud)
我想将此数据框转换为下面的数据框,其中标志是在评估列“expr”中的表达式后找到的布尔值
+---------------------------------------------------+
|expr |var1 |var2 |flag |
+-------------------------+---------+-----+---------+
|var1 > 7 |9 |0 | True |
|var1 > 7 |9 |0 | True |
|var1 > 7 |9 |0 | True |
|var1 > 7 |9 |0 | True |
|var1 = 3 AND var2 >= 0 |9 |0 | . |
|var1 = 3 AND var2 >= 0 |9 |0 | . |
|var1 = 3 AND var2 >= 0 |9 |0 | . |
|var1 = 3 AND var2 >= 0 |9 |0 | . |
|var1 = 2 AND var2 >= 0 |9 |0 | . |
+-------------------------+---------+-----+---------+
Run Code Online (Sandbox Code Playgroud)
我曾尝试使用这样的 expr 函数:
df.withColumn('flag', expr(col('expr')))
Run Code Online (Sandbox Code Playgroud)
它会按预期失败,因为 expr 函数需要一个字符串作为参数。
我想到的另一个想法是制作一个 UDF 并将“expr”列的值传递给它,但这将不允许我使用 pyspark 的 expr 函数,因为 UDF 都是非火花代码。
我的方法应该是什么?请问有什么建议吗?
所以这是一个没有 UDF 的 PySpark 解决方案。在 Scala 中,我相信您可以使用具有相同逻辑的 map 或 foldleft。
exprs = df.select('expr').distinct().collect()[0][0]
for ex in exprs:
df = df.withColumn('test', when(col('expr') == lit(ex), expr(ex)))
df.show()
Run Code Online (Sandbox Code Playgroud)
+--------------------+----+----+----+
| expr|var1|var2|test|
+--------------------+----+----+----+
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 2 AND var2...| 9| 0|null|
+--------------------+----+----+----+
Run Code Online (Sandbox Code Playgroud)
我应该指出,我不明白为什么 OP 想要这样做,如果他们为问题提供了更好的上下文,我相信有更好的方法。
迭代 DF 并不是最有效的做法,但在这种情况下,它实际上工作得非常快,因为它不会迭代数据,因此 Spark 将在一个计划内实际执行它。此外,单个 collect() 仅将 20 多万个 DF 的执行时间增加了 2 秒。
更新:
我现在更好地理解了这个问题,这会更快,因为 Spark 会在将它们合并到一列之前一次计算所有过滤器。
exprs = df.select('expr').distinct().collect()[0][0]
for ex in exprs:
df = df.withColumn('test', when(col('expr') == lit(ex), expr(ex)))
df.show()
Run Code Online (Sandbox Code Playgroud)
+--------------------+----+----+----+
| expr|var1|var2|test|
+--------------------+----+----+----+
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
| var1 > 7| 9| 0|true|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 3 AND var2...| 9| 0|null|
|var1 = 2 AND var2...| 9| 0|null|
+--------------------+----+----+----+
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
1940 次 |
最近记录: |