如何检查数组列是否在PySpark数据框中的另一个列数组内

dtj*_*dtj 4 dataframe apache-spark apache-spark-sql pyspark pyspark-sql

假设我有以下情况

from pyspark.sql.types import *
schema = StructType([  # schema
    StructField("id", StringType(), True),
    StructField("ev", ArrayType(StringType()), True),
    StructField("ev2", ArrayType(StringType()), True),])
df = spark.createDataFrame([{"id": "se1", "ev": ["ev11", "ev12"], "ev2": ["ev11"]},
                            {"id": "se2", "ev": ["ev11"], "ev2": ["ev11", "ev12"]},
                            {"id": "se3", "ev": ["ev21"], "ev2": ["ev11", "ev12"]},
                            {"id": "se4", "ev": ["ev21", "ev22"], "ev2": ["ev21", "ev22"]}],
                           schema=schema)
Run Code Online (Sandbox Code Playgroud)

这给了我:

df.show()
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se1|[ev11, ev12]|      [ev11]|
|se2|      [ev11]|[ev11, ev12]|
|se3|      [ev21]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+
Run Code Online (Sandbox Code Playgroud)

我想为其中“ ev”列的内容在“ ev2”列内的行创建一个布尔新列(或仅选择真实情况),返回:

df_target.show()
Run Code Online (Sandbox Code Playgroud)
df.show()
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se1|[ev11, ev12]|      [ev11]|
|se2|      [ev11]|[ev11, ev12]|
|se3|      [ev21]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+
Run Code Online (Sandbox Code Playgroud)

要么:

df_target.show()
Run Code Online (Sandbox Code Playgroud)
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se2|      [ev11]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+
Run Code Online (Sandbox Code Playgroud)

我尝试使用该isin方法:

df_target.show()
Run Code Online (Sandbox Code Playgroud)
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+
Run Code Online (Sandbox Code Playgroud)

但是看起来它只检查它是否是同一数组。

我也尝试过此array_contains功能,pyspark.sql.functions但仅接受一个对象,而不接受要检查的数组。

由于措辞不正确,我什至在搜索时也遇到困难。

谢谢!

abi*_*sis 10

Spark >= 2.4.0 的另一种实现避免 UDF 并使用内置的array_except

from pyspark.sql.functions import size, array_except

def is_subset(a, b):
  return size(array_except(a, b)) == 0
  
df.withColumn("is_subset", is_subset(df.ev, df.ev2))
Run Code Online (Sandbox Code Playgroud)

输出:

+---+------------+------------+---------+
| id|          ev|         ev2|is_subset|
+---+------------+------------+---------+
|se1|[ev11, ev12]|      [ev11]|    false|
|se2|      [ev11]|[ev11, ev12]|     true|
|se3|      [ev21]|[ev11, ev12]|    false|
|se4|[ev21, ev22]|[ev21, ev22]|     true|
+---+------------+------------+---------+
Run Code Online (Sandbox Code Playgroud)


mto*_*oto 5

这是使用a的选项udf,我们在其中检查ev和之间的差值长度ev2。当结果数组的长度为0或其中的所有元素ev包含在其中时ev2,我们返回True; 否则False

def contains(x,y):
  z = len(set(x) - set(y))
  if z == 0:
    return True
  else:
    return False

contains_udf = udf(contains)
df.withColumn("evInEv2", contains_udf(df.ev,df.ev2)).show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+
Run Code Online (Sandbox Code Playgroud)