Spark数据帧:根据另一列的值提取列

Tom*_*101 3 scala dataframe apache-spark apache-spark-sql

我有一个带有连接价格表的交易的数据框:

+----------+----------+------+-------+-------+
|   paid   | currency | EUR  |  USD  |  GBP  |
+----------+----------+------+-------+-------+
|   49.5   |   EUR    | 99   |  79   |  69   |
+----------+----------+------+-------+-------+
Run Code Online (Sandbox Code Playgroud)

客户已支付49.5欧元,如"货币"栏中所示.我现在想要将付费价格与价格表中的价格进行比较.

因此,我需要根据"货币"的值访问正确的列,如下所示:

df.withColumn("saved", df.col(df.col($"currency")) - df.col("paid"))
Run Code Online (Sandbox Code Playgroud)

我希望会成为

df.withColumn("saved", df.col("EUR") - df.col("paid"))
Run Code Online (Sandbox Code Playgroud)

然而,这失败了.我尝试了所有可以成像的东西,包括UDF,无处可去.

我想有一些优雅的解决方案吗?有人可以帮忙吗?

zer*_*323 5

假设列名与列中的值匹配currency:

import org.apache.spark.sql.functions.{lit, col, coalesce}
import org.apache.spark.sql.Column 

// Dummy data
val df = sc.parallelize(Seq(
  (49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50)
)).toDF("paid", "currency", "EUR", "USD", "GBP")

// A list of available currencies 
val currencies: List[String] = List("EUR", "USD", "GBP")

// Select listed value
val listedPrice: Column = coalesce(
  currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*)

df.select($"*", (listedPrice - $"paid").alias("difference")).show

// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5|     EUR| 99| 79| 69|      49.5|
// |100.0|     GBP| 80|120| 50|     -50.0|
// +-----+--------+---+---+---+----------+
Run Code Online (Sandbox Code Playgroud)

与SQL等效的listedPrice表达式是这样的:

COALESCE(
  CASE WHEN (currency = 'EUR') THEN EUR ELSE null,
  CASE WHEN (currency = 'USD') THEN USD ELSE null,
  CASE WHEN (currency = 'GBP') THEN GBP ELSE null
)
Run Code Online (Sandbox Code Playgroud)

替代使用foldLeft:

import org.apache.spark.sql.functions.when

val listedPriceViaFold = currencies.foldLeft(
  lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc))

df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show

// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5|     EUR| 99| 79| 69|      49.5|
// |100.0|     GBP| 80|120| 50|     -50.0|
// +-----+--------+---+---+---+----------+
Run Code Online (Sandbox Code Playgroud)

其中listedPriceViaFold转换为以下SQL:

CASE
  WHEN (currency = 'GBP') THEN GBP
  ELSE CASE
    WHEN (currency = 'USD') THEN USD
    ELSE CASE
      WHEN (currency = 'EUR') THEN EUR
      ELSE null
Run Code Online (Sandbox Code Playgroud)

不幸的是,我不知道任何可以像这样直接表达SQL的内置函数

CASE currency
    WHEN 'EUR' THEN EUR
    WHEN 'USD' THEN USD
    WHEN 'GBP' THEN GBP
    ELSE null
END
Run Code Online (Sandbox Code Playgroud)

但您可以在原始SQL中使用此构造.

我的假设不是你可以简单地在列名和列中的值之间添加映射currency.

编辑:

另一个选项,如果源支持谓词下推和有效的列修剪,可能是有效的,是按货币和联合子集:

currencies.map(
  // for each currency filter and add difference
  c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c))
).reduce((df1, df2) => df1.unionAll(df2)) // Union
Run Code Online (Sandbox Code Playgroud)

它相当于这样的SQL:

SELECT *,  EUR - paid AS difference FROM df WHERE currency = 'EUR'
UNION ALL
SELECT *,  USD - paid AS difference FROM df WHERE currency = 'USD'
UNION ALL
SELECT *,  GBP - paid AS difference FROM df WHERE currency = 'GBP'
Run Code Online (Sandbox Code Playgroud)