PySpark - 将地图功能添加为列

Bry*_*ind 5 rdd apache-spark-sql pyspark

我有一个pyspark DataFrame

a = [
    ('Bob', 562),
    ('Bob',880),
    ('Bob',380),
    ('Sue',85),
    ('Sue',963)
] 
df = spark.createDataFrame(a, ["Person", "Amount"])
Run Code Online (Sandbox Code Playgroud)

我需要创建一个哈希Amount并返回金额的列.问题是我无法使用,UDF所以我使用了映射功能.

df.rdd.map(lambda x: hash(x["Amount"]))
Run Code Online (Sandbox Code Playgroud)

pau*_*ult 12

如果您不能使用,udf您可以使用该map功能,但正如您目前所写的那样,只有一列.要保留所有列,请执行以下操作:

df = df.rdd\
    .map(lambda x: (x["Person"], x["Amount"], hash(str(x["Amount"]))))\
    .toDF(["Person", "Amount", "Hash"])

df.show()
#+------+------+--------------------+
#|Person|Amount|                Hash|
#+------+------+--------------------+
#|   Bob|   562|-4340709941618811062|
#|   Bob|   880|-7718876479167384701|
#|   Bob|   380|-2088598916611095344|
#|   Sue|    85|    7168043064064671|
#|   Sue|   963|-8844931991662242457|
#+------+------+--------------------+
Run Code Online (Sandbox Code Playgroud)

注意:在这种情况下,hash(x["Amount"])不是很有趣,所以我将其更改为hash Amount转换为字符串.

基本上,您必须将行映射到包含所有现有列的元组,并添加新列.

如果您的列太多而无法枚举,您还可以将元组添加到现有行.

df = df.rdd\
    .map(lambda x: x + (hash(str(x["Amount"])),))\
    .toDF(df.columns + ["Hash"])\
Run Code Online (Sandbox Code Playgroud)

我还应该指出,如果散列值是您的最终目标,还有一个pyspark函数pyspark.sql.functions.hash可用于避免序列化rdd:

import pyspark.sql.functions as f
df.withColumn("Hash", f.hash("Amount")).show()
#+------+------+----------+
#|Person|Amount|      Hash|
#+------+------+----------+
#|   Bob|   562|  51343841|
#|   Bob|   880|1241753636|
#|   Bob|   380| 514174926|
#|   Sue|    85|1944150283|
#|   Sue|   963|1665082423|
#+------+------+----------+
Run Code Online (Sandbox Code Playgroud)

这似乎使用了与python内置不同的散列算法.