kev*_*kuo 143 scala apache-spark apache-spark-sql
假设我做的事情如下:
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()
root
|-- year: string (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)
df.show()
year make model comment blank
2012 Tesla S No comment
1997 Ford E350 Go get one now th...
Run Code Online (Sandbox Code Playgroud)
但我真的想要yearas Int(并且可能会转换其他一些列).
我能想到的最好的是
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]
Run Code Online (Sandbox Code Playgroud)
这有点令人费解.
我来自R,我习惯于写作,例如
df2 <- df %>%
mutate(year = year %>% as.integer,
make = make %>% toupper)
Run Code Online (Sandbox Code Playgroud)
我可能会遗漏一些东西,因为在spark/scala中应该有更好的方法来做到这一点......
mse*_*man 133
由于spark 2.x你可以使用.withColumn.查看这里的文档:
从Spark 1.4版开始,您可以在列上应用带有DataType的强制转换方法:
import org.apache.spark.sql.types.IntegerType
val df2 = df.withColumn("yearTmp", df.year.cast(IntegerType))
.drop("year")
.withColumnRenamed("yearTmp", "year")
Run Code Online (Sandbox Code Playgroud)
如果您使用的是sql表达式,您还可以:
val df2 = df.selectExpr("cast(year as int) year",
"make",
"model",
"comment",
"blank")
Run Code Online (Sandbox Code Playgroud)
有关更多信息,请查看文档:http: //spark.apache.org/docs/1.6.0/api/scala/#org.apache.spark.sql.DataFrame
Sve*_*end 87
[编辑:2016年3月:感谢投票!但实际上,这不是最好的答案,我想基础上的解决方案withColumn,withColumnRenamed并cast提出msemelman,马丁Senne等是简单和清晰.
我认为你的方法还可以,回想一下Spark DataFrame是一个(不可变的)ROU的RDD,所以我们永远不会真正替换一个列,只是DataFrame每次都用新的模式创建新的.
假设您有一个具有以下架构的原始df:
scala> df.printSchema
root
|-- Year: string (nullable = true)
|-- Month: string (nullable = true)
|-- DayofMonth: string (nullable = true)
|-- DayOfWeek: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Distance: string (nullable = true)
|-- CRSDepTime: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
并且一些UDF在一个或多个列上定义:
import org.apache.spark.sql.functions._
val toInt = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour = udf((t: String) => "%04d".format(t.toInt).take(2).toInt )
val days_since_nearest_holidays = udf(
(year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
)
Run Code Online (Sandbox Code Playgroud)
更改列类型甚至从另一个构建新的DataFrame可以这样写:
val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour", toHour(df("CRSDepTime")))
.withColumn("dayOfWeek", toInt(df("DayOfWeek")))
.withColumn("dayOfMonth", toInt(df("DayofMonth")))
.withColumn("month", toInt(df("Month")))
.withColumn("distance", toDouble(df("Distance")))
.withColumn("nearestHoliday", days_since_nearest_holidays(
df("Year"), df("Month"), df("DayofMonth"))
)
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth",
"month", "distance", "nearestHoliday")
Run Code Online (Sandbox Code Playgroud)
产量:
scala> df.printSchema
root
|-- departureDelay: double (nullable = true)
|-- departureHour: integer (nullable = true)
|-- dayOfWeek: integer (nullable = true)
|-- dayOfMonth: integer (nullable = true)
|-- month: integer (nullable = true)
|-- distance: double (nullable = true)
|-- nearestHoliday: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这非常接近您自己的解决方案.简单地说,将类型更改和其他转换保持为单独的udf vals使代码更易读和可重用.
Mar*_*nne 61
由于该cast操作适用于Spark Column(并且我个人不喜欢udf@ Svend此时提出的),如何:
df.select( df("year").cast(IntegerType).as("year"), ... )
Run Code Online (Sandbox Code Playgroud)
转换为请求的类型?作为一个整洁的副作用,在这个意义上,不可铸造/"可转换"的价值将成为null.
如果您需要将其作为辅助方法,请使用:
object DFHelper{
def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
df.withColumn( cn, df(cn).cast(tpe) )
}
}
Run Code Online (Sandbox Code Playgroud)
使用如下:
import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
Run Code Online (Sandbox Code Playgroud)
Wei*_*Lin 46
首先,如果你想要铸造类型
import org.apache.spark.sql
df.withColumn("year", $"year".cast(sql.types.IntegerType))
Run Code Online (Sandbox Code Playgroud)
使用相同的列名称,该列将替换为新列,您无需添加和删除.
其次,关于Scala vs R. Scala代码与R最相似,我可以实现:
val df2 = df.select(
df.columns.map {
case year @ "year" => df(year).cast(IntegerType).as(year)
case make @ "make" => functions.upper(df(make)).as(make)
case other => df(other)
}: _*
)
Run Code Online (Sandbox Code Playgroud)
虽然长度比R长一点.请注意,这mutate是R数据帧的一个函数,因此Scala在不使用特殊函数的情况下非常好地提供了表达能力.
(df.columns令人惊讶的是Array [String]而不是Array [Column],也许他们希望它看起来像Python pandas的数据帧.)
dnl*_*rky 17
您可以使用selectExpr它来使它更清洁:
df.selectExpr("cast(year as int) as year", "upper(make) as make",
"model", "comment", "blank")
Run Code Online (Sandbox Code Playgroud)
man*_*are 12
用于将DataFrame的数据类型从String修改为Integer的Java代码
df.withColumn("col_name", df.col("col_name").cast(DataTypes.IntegerType))
Run Code Online (Sandbox Code Playgroud)
它只是将现有的(String数据类型)强制转换为Integer.
Piy*_*tel 10
我认为这对我来说更具可读性。
import org.apache.spark.sql.types._
df.withColumn("year", df("year").cast(IntegerType))
Run Code Online (Sandbox Code Playgroud)
这会将您的年份列转换为IntegerType创建任何临时列并删除这些列。如果要转换为任何其他数据类型,可以检查org.apache.spark.sql.types包内的类型。
小智 8
要将年份从字符串转换为int,您可以将以下选项添加到csv阅读器:"inferSchema" - >"true",请参阅DataBricks文档
所以这只有在你将问题保存到像sqlserver这样的jdbc驱动程序时才能真正起作用,但它对于你将遇到语法和类型的错误非常有帮助.
import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
// case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
}
}
JdbcDialects.registerDialect(SQLServerDialect)
Run Code Online (Sandbox Code Playgroud)
小智 6
生成包含五个值的简单数据集并转换int为string类型:
val df = spark.range(5).select( col("id").cast("string") )
Run Code Online (Sandbox Code Playgroud)
建议使用强制转换(FYI)的答案已被破坏,火花1.4.1中的强制转换方法已损坏。
例如,当字符串转换为bigint时,其字符串列的值为“ 8182175552014127960”的数据框的值为“ 8182175552014128100”
df.show
+-------------------+
| a|
+-------------------+
|8182175552014127960|
+-------------------+
df.selectExpr("cast(a as bigint) a").show
+-------------------+
| a|
+-------------------+
|8182175552014128100|
+-------------------+
Run Code Online (Sandbox Code Playgroud)
在发现此错误之前,我们不得不面对很多问题,因为我们在生产中有bigint列。
| 归档时间: |
|
| 查看次数: |
321894 次 |
| 最近记录: |