kku*_*mar 5 python pyspark pyspark-sql
我试图在 Pyspark 的 case 语句中运行子查询,但它抛出异常。如果一个表中的 id 存在于另一个表中,我正在尝试创建一个新标志。
任何人都可以让我知道这在 pyspark 中是否可行?
temp_df=spark.sql("select *, case when key in (select distinct key from Ids) then 1 else 0 end as flag from main_table")
Run Code Online (Sandbox Code Playgroud)
这是错误:
AnalysisException: 'Predicate sub-queries can only be used in a Filter
Run Code Online (Sandbox Code Playgroud)
这似乎是关于子查询的最新详细文档——它与 Spark 2.0 有关,但从那时起我还没有看到这方面的重大更新。
The linked notebook in that reference makes it clear that indeed predicate subqueries are currently supported only within WHERE clauses. i.e. this would work (but of course would not yield the desired result):
spark.sql("select * from main_table where id in (select distinct id from ids_table)")
Run Code Online (Sandbox Code Playgroud)
You could get the same result by using a left JOIN - that's what IN subqueries are generally translated into (for more details on that refer to the aforementioned linked notebook).
For example:
# set up some data
l1 = [('Alice', 1), ('Bob', 2), ('Eve', 3)]
df1 = sql_sc.createDataFrame(l1, ['name', 'id'])
l2 = [(1,), (2,)]
df2 = sql_sc.createDataFrame(l2, ['id'])
df1.createOrReplaceTempView("main_table")
df2.createOrReplaceTempView("ids_table")
# use a left join
spark.sql("select * from main_table m left join ids_table d on (m.id=d.id)") \
.withColumn('flag', func.when(func.col('d.id').isNull(), 0).otherwise(1)) \
.drop('id').collect()
# result:
[Row(name='Bob', flag=1), Row(name='Eve', flag=0), Row(name='Alice', flag=1)]
Run Code Online (Sandbox Code Playgroud)
Or, using pyspark sql functions rather than sql syntax:
df2 = df2.withColumnRenamed('id', 'id_faux')
df1.join(df2, df1.id == df2.id_faux, how='left') \
.withColumn('flag', func.when(func.col('id_faux').isNull(), 0).otherwise(1)).drop('id_faux').collect()
Run Code Online (Sandbox Code Playgroud)