如何在Spark SQL的DataFrame中更改列类型?

kev*_*kuo 143 scala apache-spark apache-spark-sql

假设我做的事情如下:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...  
Run Code Online (Sandbox Code Playgroud)

但我真的想要yearas Int(并且可能会转换其他一些列).

我能想到的最好的是

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]
Run Code Online (Sandbox Code Playgroud)

这有点令人费解.

我来自R,我习惯于写作,例如

df2 <- df %>%
   mutate(year = year %>% as.integer, 
          make = make %>% toupper)
Run Code Online (Sandbox Code Playgroud)

我可能会遗漏一些东西,因为在spark/scala中应该有更好的方法来做到这一点......

mse*_*man 133

编辑:最新版本

由于spark 2.x你可以使用.withColumn.查看这里的文档:

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache.spark.sql.Column) :org.apache.spark.sql.DataFrame

最古老的答案

从Spark 1.4版开始,您可以在列上应用带有DataType的强制转换方法:

import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
    .drop("year")
    .withColumnRenamed("yearTmp", "year")
Run Code Online (Sandbox Code Playgroud)

如果您使用的是sql表达式,您还可以:

val df2 = df.selectExpr("cast(year as int) year", 
                        "make", 
                        "model", 
                        "comment", 
                        "blank")
Run Code Online (Sandbox Code Playgroud)

有关更多信息,请查看文档:http: //spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame

  • 没有必要删除列,然后重命名.你可以在一行中执行`df.withColumn("ctr",temp("ctr").cast(DecimalType(decimalPrecision,decimalScale))) (5认同)
  • 来自[docs](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@withColumn(colName:String,col:org.apache) .spark.sql.Column):org.apache.spark.sql.DataFrame)`Spark 2.x`,`df.withColumn(..)`可以**添加或替换**列,具体取决于`colName `论点 (5认同)
  • 为什么你跟着Column一起使用了?使用withColumn与原始列名称不是更容易吗? (4认同)

Sve*_*end 87

[编辑:2016年3月:感谢投票!但实际上,这不是最好的答案,我想基础上的解决方案withColumn,withColumnRenamedcast提出msemelman,马丁Senne等是简单和清晰.

我认为你的方法还可以,回想一下Spark DataFrame是一个(不可变的)ROU的RDD,所以我们永远不会真正替换一个列,只是DataFrame每次都用新的模式创建新的.

假设您有一个具有以下架构的原始df:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

并且一些UDF在一个或多个列上定义:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )
Run Code Online (Sandbox Code Playgroud)

更改列类型甚至从另一个构建新的DataFrame可以这样写:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            
Run Code Online (Sandbox Code Playgroud)

产量:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这非常接近您自己的解决方案.简单地说,将类型更改和其他转换保持为单独的udf vals使代码更易读和可重用.

  • 这既不安全也不高效.__Not safe__,因为单个"NULL"或格式错误的条目会导致整个作业崩溃.__Not effective__因为UDF对Catalyst不透明.使用UDF进行复杂操作很好,但没有理由将它们用于基本类型转换.这就是为什么我们有'cast`方法(参见[Martin Senne的回答](http://stackoverflow.com/a/32634826/1560062)).使事情对Catalyst透明需要更多的工作,但基本的安全只是让'Try`和`Option`工作. (21认同)
  • 有没有办法将`withColumn()`部分减少为遍历所有列的泛型部分? (3认同)

Mar*_*nne 61

由于该cast操作适用于Spark Column(并且我个人不喜欢udf@ Svend此时提出的),如何:

df.select( df("year").cast(IntegerType).as("year"), ... )
Run Code Online (Sandbox Code Playgroud)

转换为请求的类型?作为一个整洁的副作用,在这个意义上,不可铸造/"可转换"的价值将成为null.

如果您需要将其作为辅助方法,请使用:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}
Run Code Online (Sandbox Code Playgroud)

使用如下:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
Run Code Online (Sandbox Code Playgroud)

  • 你能告诉我如何继续,如果我需要转换和重命名一大堆列(我有50列,而且scala相当新,不确定什么是最好的方法来处理它而不会产生大量的重复)?有些列应该保留String,有些列应该转换为Float. (2认同)

Wei*_*Lin 46

首先,如果你想要铸造类型

import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))
Run Code Online (Sandbox Code Playgroud)

使用相同的列名称,该列将替换为新列,您无需添加和删除.

其次,关于Scala vs R. Scala代码与R最相似,我可以实现:

val df2 = df.select(
   df.columns.map {
     case year @ "year" => df(year).cast(IntegerType).as(year)
     case make @ "make" => functions.upper(df(make)).as(make)
     case other         => df(other)
   }: _*
)
Run Code Online (Sandbox Code Playgroud)

虽然长度比R长一点.请注意,这mutate是R数据帧的一个函数,因此Scala在不使用特殊函数的情况下非常好地提供了表达能力.

(df.columns令人惊讶的是Array [String]而不是Array [Column],也许他们希望它看起来像Python pandas的数据帧.)


dnl*_*rky 17

您可以使用selectExpr它来使它更清洁:

df.selectExpr("cast(year as int) as year", "upper(make) as make",
    "model", "comment", "blank")
Run Code Online (Sandbox Code Playgroud)


man*_*are 12

用于将DataFrame的数据类型从String修改为Integer的Java代码

df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
Run Code Online (Sandbox Code Playgroud)

它只是将现有的(String数据类型)强制转换为Integer.


Piy*_*tel 10

我认为这对我来说更具可读性。

import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))
Run Code Online (Sandbox Code Playgroud)

这会将您的年份列转换为IntegerType创建任何临时列并删除这些列。如果要转换为任何其他数据类型,可以检查org.apache.spark.sql.types包内的类型。


小智 8

要将年份从字符串转换为int,您可以将以下选项添加到csv阅读器:"inferSchema" - >"true",请参阅DataBricks文档

  • 这很好用,但问题是读者必须再次传递你的文件 (5认同)

ben*_*man 6

所以这只有在你将问题保存到像sqlserver这样的jdbc驱动程序时才能真正起作用,但它对于你将遇到语法和类型的错误非常有帮助.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)
Run Code Online (Sandbox Code Playgroud)


小智 6

生成包含五个值的简单数据集并转换intstring类型:

val df = spark.range(5).select( col("id").cast("string") )
Run Code Online (Sandbox Code Playgroud)


sau*_*I3h 5

建议使用强制转换(FYI)的答案已被破坏,火花1.4.1中的强制转换方法已损坏。

例如,当字符串转换为bigint时,其字符串列的值为“ 8182175552014127960”的数据框的值为“ 8182175552014128100”

    df.show
+-------------------+
|                  a|
+-------------------+
|8182175552014127960|
+-------------------+

    df.selectExpr("cast(a as bigint) a").show
+-------------------+
|                  a|
+-------------------+
|8182175552014128100|
+-------------------+
Run Code Online (Sandbox Code Playgroud)

在发现此错误之前,我们不得不面对很多问题,因为我们在生产中有bigint列。

  • psst,升级您的火花 (4认同)
  • @msemelman必须在生产环境中升级到新版本的Spark以解决一个小错误,这很荒谬。 (2认同)

sou*_*ine 5

df.select($"long_col".cast(IntegerType).as("int_col"))
Run Code Online (Sandbox Code Playgroud)