PySpark使用dict创建新列

ad_*_*d_s 13 python dictionary apache-spark apache-spark-sql pyspark

使用Spark 1.6,我有一个Spark DataFrame column(名为let col1),其值为A,B,C,DS,DNS,E,F,G和H,我想col2dict下面的值创建一个新列(比方说),我该如何映射?(所以''A'需要映射到'S'等......"

dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}
Run Code Online (Sandbox Code Playgroud)

use*_*411 33

使用UDF的低效解决方案(版本无关):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, StringType())

df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key'])
mapping = {
    'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 
    'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}

df.withColumn("value", translate(mapping)("key"))
Run Code Online (Sandbox Code Playgroud)

结果:

+-------+-----+
|    key|value|
+-------+-----+
|     DS|    S|
|      G|   NS|
|INVALID| null|
+-------+-----+
Run Code Online (Sandbox Code Playgroud)

更高效(仅限Spark 2.0+)是创建一个MapType文字:

from pyspark.sql.functions import col, create_map, lit
from itertools import chain

mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

df.withColumn("value", mapping_expr.getItem(col("key")))
Run Code Online (Sandbox Code Playgroud)

结果相同:

+-------+-----+
|    key|value|
+-------+-----+
|     DS|    S|
|      G|   NS|
|INVALID| null|
+-------+-----+
Run Code Online (Sandbox Code Playgroud)

但更有效的执行计划:

== Physical Plan ==
*Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53]
+- Scan ExistingRDD[key#15]
Run Code Online (Sandbox Code Playgroud)

与UDF版本相比:

== Physical Plan ==
*Project [key#15, pythonUDF0#61 AS value#57]
+- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61]
   +- Scan ExistingRDD[key#15]
Run Code Online (Sandbox Code Playgroud)


Hai*_*nan 22

听起来最简单的解决方案是使用替换功能:http : //spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace

mapping= {
        'A': '1',
        'B': '2'
    }
df2 = df.replace(to_replace=mapping, subset=['yourColName'])
Run Code Online (Sandbox Code Playgroud)

  • 我喜欢这个解决方案的缺点。如果您想要一个额外的列,只需使用 `.withColumn("newColumn", "column_to_copy")` 左右复制该列 - 该示例仅提供您自己执行此操作所需知道的最少代码:) 有时我认为对 SO 的评论只是习惯了迂腐.. (5认同)
  • 这里的问题是,这不会创建新列,而是会替换原始列中的值。 (4认同)
  • 替换还要求新值与原始列的类型相同。 (3认同)
  • 您不能先将旧值复制到新列中,然后使用此函数吗? (2认同)