PySpark-使用UDF从值列表中添加列

Bry*_*ind 3 python list user-defined-functions apache-spark-sql pyspark

我必须根据值列表将列添加到PySpark数据框。

a= spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
Run Code Online (Sandbox Code Playgroud)

我有一个名为评价的列表,它是每个宠物的评价。

rating = [5,4,1]
Run Code Online (Sandbox Code Playgroud)

我需要在数据框后面附加一个称为Rating的列,这样

a= spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
Run Code Online (Sandbox Code Playgroud)

我已完成以下操作,但是它仅返回“评级”列中列表中的第一个值

def add_labels():
    return rating.pop(0)

labels_udf = udf(add_labels, IntegerType())

new_df = a.withColumn('Rating', labels_udf()).cache()
Run Code Online (Sandbox Code Playgroud)

出:

rating = [5,4,1]
Run Code Online (Sandbox Code Playgroud)

小智 7

我可能是错的,但我相信接受的答案是行不通的。monotonically_increasing_id仅保证 id 将是唯一的且不断增加,而不是它们将是连续的。因此,在两个不同的数据帧上使用它可能会创建两个非常不同的列,并且连接将主要返回空。

从这个答案/sf/answers/3374831421/ 中汲取灵感,我们可以将错误答案更改为:

from pyspark.sql.window import Window as W
from pyspark.sql import functions as F

a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
                               ["Animal", "Enemy"])

a.show()

+------+-----+
|Animal|Enemy|
+------+-----+
|   Dog|  Cat|
|   Cat|  Dog|
| Mouse|  Cat|
+------+-----+



#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])
b.show()

+------+
|Rating|
+------+
|     5|
|     4|
|     1|
+------+


a = a.withColumn("idx", F.monotonically_increasing_id())
b = b.withColumn("idx", F.monotonically_increasing_id())

windowSpec = W.orderBy("idx")
a = a.withColumn("idx", F.row_number().over(windowSpec))
b = b.withColumn("idx", F.row_number().over(windowSpec))

a.show()
+------+-----+---+
|Animal|Enemy|idx|
+------+-----+---+
|   Dog|  Cat|  1|
|   Cat|  Dog|  2|
| Mouse|  Cat|  3|
+------+-----+---+

b.show()
+------+---+
|Rating|idx|
+------+---+
|     5|  1|
|     4|  2|
|     1|  3|
+------+---+

final_df = a.join(b, a.idx == b.idx).drop("idx")

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+
Run Code Online (Sandbox Code Playgroud)

  • 没关系!但如果没有“分区”,这可能会导致大型数据集的性能严重下降。 (3认同)

mka*_*ran 6

正如@Tw UxTLi51Nus 所提到的,如果您可以通过 Animal 订购 DataFrame,而这不会改变您的结果,那么您可以执行以下操作:

def add_labels(indx):
    return rating[indx-1] # since row num begins from 1
labels_udf = udf(add_labels, IntegerType())

a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])
a.createOrReplaceTempView('a')
a = spark.sql('select row_number() over (order by "Animal") as num, * from a')

a.show()


+---+------+-----+
|num|Animal|Enemy|
+---+------+-----+
|  1|   Dog|  Cat|
|  2|   Cat|  Dog|
|  3| Mouse|  Cat|
+---+------+-----+

new_df = a.withColumn('Rating', labels_udf('num'))
new_df.show()
+---+------+-----+------+
|num|Animal|Enemy|Rating|
+---+------+-----+------+
|  1|   Dog|  Cat|     5|
|  2|   Cat|  Dog|     4|
|  3| Mouse|  Cat|     1|
+---+------+-----+------+
Run Code Online (Sandbox Code Playgroud)

然后删除num列:

new_df.drop('num').show()
+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Dog|  Cat|     5|
|   Cat|  Dog|     4|
| Mouse|  Cat|     1|
+------+-----+------+
Run Code Online (Sandbox Code Playgroud)

编辑:

另一种 - 但可能是丑陋且效率低下的 - 如果您不能按列排序,则返回到 rdd 并执行以下操作:

a = spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "Enemy"])

# or create the rdd from the start:
# a = spark.sparkContext.parallelize([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")])

a = a.rdd.zipWithIndex()
a = a.toDF()
a.show()

+-----------+---+
|         _1| _2|
+-----------+---+
|  [Dog,Cat]|  0|
|  [Cat,Dog]|  1|
|[Mouse,Cat]|  2|
+-----------+---+

a = a.select(bb._1.getItem('Animal').alias('Animal'), bb._1.getItem('Enemy').alias('Enemy'), bb._2.alias('num'))

def add_labels(indx):
    return rating[indx] # indx here will start from zero

labels_udf = udf(add_labels, IntegerType())

new_df = a.withColumn('Rating', labels_udf('num'))

new_df.show()

+---------+--------+---+------+
|Animal   |   Enemy|num|Rating|
+---------+--------+---+------+
|      Dog|     Cat|  0|     5|
|      Cat|     Dog|  1|     4|
|    Mouse|     Cat|  2|     1|
+---------+--------+---+------+
Run Code Online (Sandbox Code Playgroud)

(如果你有很多数据,我不会推荐这个)

希望这有帮助,祝你好运!


Pre*_*rem 6

希望这可以帮助!

from pyspark.sql.functions import monotonically_increasing_id

#sample data
a= sqlContext.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],
                               ["Animal", "Enemy"])
a.show()

#convert list to a dataframe
rating = [5,4,1]
b = sqlContext.createDataFrame([(l,) for l in rating], ['Rating'])

#join both dataframe to get the final result
a = a.withColumn("row_idx", monotonically_increasing_id())
b = b.withColumn("row_idx", monotonically_increasing_id())
final_df = a.join(b, a.row_idx == b.row_idx).\
             drop("row_idx")
final_df.show()
Run Code Online (Sandbox Code Playgroud)

输入:

+------+-----+
|Animal|Enemy|
+------+-----+
|   Dog|  Cat|
|   Cat|  Dog|
| Mouse|  Cat|
+------+-----+
Run Code Online (Sandbox Code Playgroud)

输出为:

+------+-----+------+
|Animal|Enemy|Rating|
+------+-----+------+
|   Cat|  Dog|     4|
|   Dog|  Cat|     5|
| Mouse|  Cat|     1|
+------+-----+------+
Run Code Online (Sandbox Code Playgroud)

  • 在更大的数据帧上尝试了这个答案,但列不匹配。 (2认同)