YOL*_*OLO 4 python apache-spark pyspark
我想知道如何在数据框中的特定列中映射值.
我有一个数据框,看起来像:
df = sc.parallelize([('india','japan'),('usa','uruguay')]).toDF(['col1','col2'])
+-----+-------+
| col1| col2|
+-----+-------+
|india| japan|
| usa|uruguay|
+-----+-------+
Run Code Online (Sandbox Code Playgroud)
我有一个字典,我想要映射值.
dicts = sc.parallelize([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')])
Run Code Online (Sandbox Code Playgroud)
我想要的输出是:
+-----+-------+--------+--------+
| col1| col2|col1_map|col2_map|
+-----+-------+--------+--------+
|india| japan| ind| jpn|
| usa|uruguay| us| urg|
+-----+-------+--------+--------+
Run Code Online (Sandbox Code Playgroud)
我尝试过使用lookup function
它但不起作用.它抛出错误SPARK-5063.以下是我失败的方法:
def map_val(x):
return dicts.lookup(x)[0]
myfun = udf(lambda x: map_val(x), StringType())
df = df.withColumn('col1_map', myfun('col1')) # doesn't work
df = df.withColumn('col2_map', myfun('col2')) # doesn't work
Run Code Online (Sandbox Code Playgroud)
Ali*_*AzG 13
我认为,更简单的方法就是使用一个简单的dictionary
和df.withColumn
.
from itertools import chain
from pyspark.sql.functions import create_map, lit
simple_dict = {'india':'ind', 'usa':'us', 'japan':'jpn', 'uruguay':'urg'}
mapping_expr = create_map([lit(x) for x in chain(*simple_dict.items())])
df = df.withColumn('col1_map', mapping_expr[df['col1']])\
.withColumn('col2_map', mapping_expr[df['col2']])
df.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
我建议您将元组列表更改为字典并将其广播以在udf中使用
dicts = sc.broadcast(dict([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')]))
from pyspark.sql import functions as f
from pyspark.sql import types as t
def newCols(x):
return dicts.value[x]
callnewColsUdf = f.udf(newCols, t.StringType())
df.withColumn('col1_map', callnewColsUdf(f.col('col1')))\
.withColumn('col2_map', callnewColsUdf(f.col('col2')))\
.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
这应该给你
+-----+-------+--------+--------+
|col1 |col2 |col1_map|col2_map|
+-----+-------+--------+--------+
|india|japan |ind |jpn |
|usa |uruguay|us |urg |
+-----+-------+--------+--------+
Run Code Online (Sandbox Code Playgroud)
您要做的就是也将rds更改为dataframe,并使用两个具有别名的联接,如下所示
df = sc.parallelize([('india','japan'),('usa','uruguay')]).toDF(['col1','col2'])
dicts = sc.parallelize([('india','ind'), ('usa','us'),('japan','jpn'),('uruguay','urg')]).toDF(['key', 'value'])
from pyspark.sql import functions as f
df.join(dicts, df['col1'] == dicts['key'], 'inner')\
.select(f.col('col1'), f.col('col2'), f.col('value').alias('col1_map'))\
.join(dicts, df['col2'] == dicts['key'], 'inner') \
.select(f.col('col1'), f.col('col2'), f.col('col1_map'), f.col('value').alias('col2_map'))\
.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
应该给你同样的结果