sme*_*eeb 5 scala dataframe apache-spark apache-spark-sql
Scala 2.10在这里使用Spark 1.6.2.我有一个类似(但不相同)的问题作为这一个,然而,接受的答案是不是SSCCE并承担一定的"前期知识"关于星火; 因此我无法复制或理解它.更重要的是,该问题也仅限于向现有数据框添加新列,而我需要为数据框中的所有现有行添加列和值.
所以我想在现有的Spark DataFrame中添加一个列,然后将该新列的初始('default')值应用于所有行.
val json : String = """{ "x": true, "y": "not true" }"""
val rdd = sparkContext.parallelize(Seq(json))
val jsonDF = sqlContext.read.json(rdd)
jsonDF.show()
Run Code Online (Sandbox Code Playgroud)
当我运行时,我得到以下作为输出(通过.show()):
+----+--------+
| x| y|
+----+--------+
|true|not true|
+----+--------+
Run Code Online (Sandbox Code Playgroud)
现在我想jsonDF在创建它之后添加一个新字段,而不修改json字符串,这样得到的DF将如下所示:
+----+--------+----+
| x| y| z|
+----+--------+----+
|true|not true| red|
+----+--------+----+
Run Code Online (Sandbox Code Playgroud)
意思是,我想z在DF类型中添加一个新的" "列StringType,然后默认所有行包含z-value "red".
从另一个问题我将以下伪代码拼凑在一起:
val json : String = """{ "x": true, "y": "not true" }"""
val rdd = sparkContext.parallelize(Seq(json))
val jsonDF = sqlContext.read.json(rdd)
//jsonDF.show()
val newDF = jsonDF.withColumn("z", jsonDF("col") + 1)
newDF.show()
Run Code Online (Sandbox Code Playgroud)
但是当我运行它时,我在该.withColumn(...)方法上遇到编译器错误:
org.apache.spark.sql.AnalysisException: Cannot resolve column name "col" among (x, y);
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)
Run Code Online (Sandbox Code Playgroud)
我也没有看到任何允许我设置"red"为默认值的API方法.关于我哪里出错的想法?
use*_*411 20
你可以使用lit功能.首先你必须导入它
import org.apache.spark.sql.functions.lit
Run Code Online (Sandbox Code Playgroud)
并使用它如下所示
jsonDF.withColumn("z", lit("red"))
Run Code Online (Sandbox Code Playgroud)
将自动推断列的类型.
| 归档时间: |
|
| 查看次数: |
10561 次 |
| 最近记录: |