根据列值是否在另一列中,将列添加到PySpark DataFrame

mar*_*tin 3 python apache-spark apache-spark-sql pyspark

我有一个PySpark DataFrame,其结构由

[('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])].toDF('user', 'item', 'fav_items')
Run Code Online (Sandbox Code Playgroud)

我需要添加一个带有1或0的列,具体取决于'item'是否在'fav_items'中.

所以我想要

[('u1', 1, [1 ,2, 3], 1), ('u1', 4, [1, 2, 3], 0)]
Run Code Online (Sandbox Code Playgroud)

我如何查找第二列到第三列来决定值以及如何添加它?

Hug*_*yes 7

以下代码执行所请求的任务.定义了一个用户定义的函数,它接收两列a DataFrame作为参数.因此,对于每一行,搜索项目是否在项目列表中.如果找到该项,则返回1,否则返回0.

# Imports
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# First we create a RDD in order to create a dataFrame:
rdd = sc.parallelize([('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])])
df = rdd.toDF(['user', 'item', 'fav_items'])
# Print dataFrame
df.show()

# We make an user define function that receives two columns and do operation
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())

df.select('user', 'item', 'fav_items', function(col('item'), col('fav_items')).alias('result')).show()
Run Code Online (Sandbox Code Playgroud)

结果如下:

+----+----+---------+
|user|item|fav_items|
+----+----+---------+
|  u1|   1|[1, 2, 3]|
|  u1|   4|[1, 2, 3]|
+----+----+---------+

+----+----+---------+------+
|user|item|fav_items|result|
+----+----+---------+------+
|  u1|   1|[1, 2, 3]|     1|
|  u1|   4|[1, 2, 3]|     0|
+----+----+---------+------+
Run Code Online (Sandbox Code Playgroud)