PYSPARK DF MAP:获取 Spark 映射中给定键的值

Hub*_*dek 6 apache-spark apache-spark-sql pyspark

我有带有地图的“国家”数据框:

+--------------------+
|                 map|
+--------------------+
|[1-> Spain        |
|[2-> Germany   ...|
|[3-> Czech Repu...|
|[4-> Malta     ...|
Run Code Online (Sandbox Code Playgroud)

如何使用键从地图访问值,然后如何使用地图数据帧从其他数据帧中的列映射值。

因此,从“销售”数据框来看,如下所示:

+--------------------+
|country_id | Sale   |
+--------------------+
|1          |200     |
|2          |565     |
Run Code Online (Sandbox Code Playgroud)

Country_id 值将映射到国家/地区(我们将删除 Country_id 列):

+--------------------+
|country    | Sale   |
+--------------------+
|Spain      |200     |
|Germany    |565     |
Run Code Online (Sandbox Code Playgroud)

我知道替代方法,例如使用联接或字典映射,但这里的问题仅涉及火花映射。尝试过诸如 element_at 之类的功能,但它没有正常工作。

pau*_*ult 8

如果您从示例中所示的两个数据帧开始,则获得所需输出的惯用方法是通过联接。(我假设您的地图数据帧相对于销售数据帧较小,您可能可以使用连接来逃脱broadcast。)

from pyspark.sql.functions import broadcast, col, explode, 
from pyspark.sql.types import IntegerType, MapType, StringType
from pyspark.sql.types import StructType, StructField

# set up data
map_df = spark.createDataFrame(
    [({1: "Spain"},),({2: "Germany"},),({3: "Czech Republic"},),({4: "Malta"},)],
    schema=StructType([StructField("map", MapType(IntegerType(), StringType()))])
)
sale_df = spark.createDataFrame([(1, 200), (2, 565)], ["country_id","Sale"])

# join
sale_df.join(
    broadcast(map_df.select(explode("map").alias("country_id", "country"))), 
    on="country_id",
    how="left"
).select("country", "Sale").show()
#+-------+----+
#|country|Sale|
#+-------+----+
#|  Spain| 200|
#|Germany| 565|
#+-------+----+
Run Code Online (Sandbox Code Playgroud)

相反,如果您将映射作为单个映射MapType,则可以通过在执行计划中向上推映射的评估来避免连接。

from pyspark.sql.functions import array, map_from_arrays, lit

my_dict = {1: "Spain", 2: "Germany", 3: "Czech Republic", 4: "Malta"}
my_map = map_from_arrays(
    array(*map(lit, my_dict.keys())),
    array(*map(lit, my_dict.values()))
)
print(my_map)
#Column<map_from_arrays(array(1, 2, 3, 4), array(Spain, Germany, Czech Republic, Malta))>
Run Code Online (Sandbox Code Playgroud)

getItem现在在您的 select 语句中使用:

sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").show()
#+-------+----+
#|country|Sale|
#+-------+----+
#|  Spain| 200|
#|Germany| 565|
#+-------+----+
Run Code Online (Sandbox Code Playgroud)

以及执行计划:

sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").explain()
#== Physical Plan ==
#*(1) Project [keys: [1,2,3,4], values: [Spain,Germany,Czech Republic,Malta][cast(country_id#6L as int)] AS country#62, Sale#7L]
#+- Scan ExistingRDD[country_id#6L,Sale#7L]
Run Code Online (Sandbox Code Playgroud)

您可以将第一个方法(DataFrame)中的数据转换为第二个方法吗?是的 - 但几乎可以肯定不值得这样做。