假设我们有DataFrame这样的:
+--------+--------------+-----+--------------------+
|aid |bid |value| time|
+--------+--------------+-----+--------------------+
| 1| 1| 81.0|2006-08-25 14:13:...|
| 1| 1| 81.0|2006-08-25 14:27:...|
| 1| 2| 81.0|2006-08-25 14:56:...|
| 1| 2| 81.0|2006-08-25 15:00:...|
| 1| 3| 81.0|2006-08-25 15:31:...|
| 1| 3| 81.0|2006-08-25 15:38:...|
| 1| 4| 0.0|2006-08-30 11:59:...|
| 1| 4| 0.0|2006-08-30 13:59:...|
| 2| 1| 0.0|2006-08-30 12:11:...|
| 2| 1| 0.0|2006-08-30 14:13:...|
| 2| 2| 0.0|2006-08-30 12:30:...|
| 2| 2| 0.0|2006-08-30 14:30:...|
| 2| 3| 0.0|2006-09-05 12:29:...|
| 2| 3| 0.0|2006-09-05 14:31:...|
| 3| 1| 0.0|2006-09-05 12:42:...|
| 3| 1| 0.0|2006-09-05 14:43:...|
+--------+--------------+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)
我知道我可以这样做:
df_data.where(col('bid')
.isin([1,2,3])).show()
Run Code Online (Sandbox Code Playgroud)
为了仅选择具有以下bid之一的行[1,2,3]。
但是,我希望能够根据[(1,1), (2,2), (3,1)]两列aid和的元组列表选择一个子集bid。
所以基本上“类似”
df_data.where(col(['aid', 'bid'])
.isin([(1,1), (2,2), (3,1)])).show()
Run Code Online (Sandbox Code Playgroud)
有没有办法做到这一点?
我可以想象这样的事情:
sql.sql('SELECT * FROM df_data WHERE (scope_id, measurement_id) IN ((1,1))')
Run Code Online (Sandbox Code Playgroud)
但这会抛出:
AnalysisException: "cannot resolve '(struct(df_data.`aid`, df_data.`bid`) IN (struct(1, 1)))' due to data type mismatch: Arguments must be same type; line 1 pos 55"
Run Code Online (Sandbox Code Playgroud)
我可以想到三种方法。
reduce帮助检查所有条件伪代码(s, m) IN [(1,1), (2,2), (3,1)]相当于:
(s == 1 and m == 1) or (s == 2 and m == 2) or (s == 3 and m == 3)
Run Code Online (Sandbox Code Playgroud)
您可以使用列表理解和 来检查所有这些条件reduce。
import pyspark.sql.functions as f
check_list = [(1,1), (2,2), (3,1)]
df.where(
reduce(
lambda u, v: u|v,
[(f.col("aid") == x) & (f.col("bid") == y) for (x,y) in check_list]
)
)\
.select("aid", "bid", "value")\
.show()
#+---+---+-----+
#|aid|bid|value|
#+---+---+-----+
#| 1| 1| 81.0|
#| 1| 1| 81.0|
#| 2| 2| 0.0|
#| 2| 2| 0.0|
#| 3| 1| 0.0|
#| 3| 1| 0.0|
#+---+---+-----+
Run Code Online (Sandbox Code Playgroud)
创建一个临时列作为两id列的字符串连接。然后检查该字符串是否与字符串列表匹配。
check_list = [(1,1), (2,2), (3,1)]
check_list_str = [",".join([str(x) for x in item]) for item in check_list]
df.withColumn("combined_id", f.concat(f.col("aid"), f.lit(","), f.col("bid")))\
.where(f.col("combined_id").isin(check_list_str))\
.select("aid", "bid", "value")\
.show()
#+---+---+-----+
#|aid|bid|value|
#+---+---+-----+
#| 1| 1| 81.0|
#| 1| 1| 81.0|
#| 2| 2| 0.0|
#| 2| 2| 0.0|
#| 3| 1| 0.0|
#| 3| 1| 0.0|
#+---+---+-----+
Run Code Online (Sandbox Code Playgroud)
创建一个udf来检查布尔条件。
check_list = [(1,1), (2,2), (3,1)]
check_id_isin = f.udf(lambda x, y: (x, y) in check_list, BooleanType())
df.where(check_id_isin(f.col("aid"), f.col("bid")) == True)\
.select("aid", "bid", "value")\
.show()
#+---+---+-----+
#|aid|bid|value|
#+---+---+-----+
#| 1| 1| 81.0|
#| 1| 1| 81.0|
#| 2| 2| 0.0|
#| 2| 2| 0.0|
#| 3| 1| 0.0|
#| 3| 1| 0.0|
#+---+---+-----+
Run Code Online (Sandbox Code Playgroud)
编辑正如@StefanFalk指出的那样,可以udf更一般地写为:
check_id_isin = f.udf(lambda *idx: idx in check_list, BooleanType())
Run Code Online (Sandbox Code Playgroud)
这将允许可变数量的输入参数。
| 归档时间: |
|
| 查看次数: |
2964 次 |
| 最近记录: |