Sca*_*Boy 6 scala apache-spark apache-spark-sql
我有df
一些数据框架,它是计算过程的结果。然后,我将此DataFrame存储在数据库中以备将来使用。
例如:
val rowsRDD: RDD[Row] = sc.parallelize(
Seq(
Row("first", 2.0, 7.0),
Row("second", 3.5, 2.5),
Row("third", 7.0, 5.9)
)
)
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val1", DoubleType, true))
.add(StructField("val2", DoubleType, true))
val df = spark.createDataFrame(rowsRDD, schema)
Run Code Online (Sandbox Code Playgroud)
我需要检查最终DataFrame中的所有列是否都与特定的数据类型相对应。当然,一种方法是使用架构创建DataFrame(如上述示例)。但是,在某些情况下,有时会在计算过程中(在创建初始DataFrame之后)将更改引入数据类型(例如,当更改了应用于DataFrame的某些公式时)。
因此,我想仔细检查一下最终的 DataFrame是否对应于初始模式。如果不对应,那么我想应用相应的转换。有什么办法吗?
小智 12
基于来自https://spark.apache.org/docs/2.2.0/sql-programming-guide.html 的无类型数据集操作,它应该是:
df.printSchema()
小智 9
你可以试试
> df.printSchema
root
|-- id: string (nullable = true)
|-- val1: double (nullable = true)
|-- val2: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这会以树格式打印模式。希望这会有所帮助。
您可以使用schema方法获取数据框的架构
df.schema
Run Code Online (Sandbox Code Playgroud)
定义一个castColumn方法
def castColumn(df: DataFrame, colName: String, randomDataType: DataType): DataFrame =
df.withColumn(colName, df.col(colName).cast(randomDataType))
Run Code Online (Sandbox Code Playgroud)
然后将此方法应用于所有需要转换的列。
首先,获取一个带有colName和目标dataType的元组数组
//Assume your dataframes have the same column names, you need to sortBy in case the it is not in the same order
// You can also iterate through dfOrigin.schema only and compare their dataTypes with target dataTypes instead of zipping
val differences = (dfOrigin.schema.fields.sortBy{case (x: StructField) => x.name} zip dfTarget.schema.fields.sortBy{case (x: StructField) => x.name}).collect {
case (origin: StructField, target: StructField) if origin.dataType != target.dataType =>
(origin.name, target.dataType)
}
Run Code Online (Sandbox Code Playgroud)
然后
differences.foldLeft(df) {
case (acc, value) => castColumn(acc, value._1, value._2)
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
18092 次 |
最近记录: |