这是Spark 1.3中的回归错误吗?

Era*_*dan 5 apache-spark apache-spark-sql

如果没有spark SQL 1.2.1中的弃用警告,则以下代码在1.3中停止工作

工作在1.2.1(没有任何弃用警告)

 val sqlContext = new HiveContext(sc)
 import sqlContext._
 val jsonRDD = sqlContext.jsonFile(jsonFilePath)
 jsonRDD.registerTempTable("jsonTable")

 val jsonResult = sql(s"select * from jsonTable")
 val foo = jsonResult.zipWithUniqueId().map {
   case (Row(...), uniqueId) => // do something useful
   ...
 }

 foo.registerTempTable("...")
Run Code Online (Sandbox Code Playgroud)

停止在1.3.0工作(根本不编译,我所做的只是改为1.3)

jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method
Run Code Online (Sandbox Code Playgroud)

不工作的解决方法:

虽然这可能会给我一个RDD [Row]:

jsonResult.rdd.zipWithUniqueId()  
Run Code Online (Sandbox Code Playgroud)

现在这不会起作用,因为当然RDD[Row]没有registerTempTable方法

     foo.registerTempTable("...")
Run Code Online (Sandbox Code Playgroud)

这是我的问题

  1. 有解决方法吗?(例如,我只是做错了吗?)
  2. 这是一个错误吗?(我认为在没有@deprecated警告的情况下,停止编译在以前版本中工作的任何内容显然都是回归错误)

Mic*_*ust 5

这不是一个错误,但对于混乱感到抱歉!直到Spark 1.3,Spark SQL被标记为Alpha组件,因为API仍在不断变化.使用Spark 1.3,我们毕业并稳定了API.有关移植时需要执行的操作的完整说明,请参阅文档.

我也可以回答您的具体问题,并为我们进行这些更改的原因提供一些理由

停止在1.3.0工作(根本不编译,我所做的只是改为1.3) jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method

DataFrame现在是Scala和Java的统一接口.但是,由于我们必须保持与现有RDD API的兼容性,DataFrames而不是RDDs.要获得RDD表示,您可以调用df.rdddf.javaRDD

另外,因为我们担心隐式转换可能会发生一些混乱,所以我们必须明确调用rdd.toDF以使RDD转换发生.但是,如果您的RDD包含继承自的对象Product(即元组或案例类),则此转换仅自动生效.

回到最初的问题,如果你想对任意模式的行进行转换,你需要在map操作之后显式地告诉Spark SQL数据的结构(因为编译器不能).

import org.apache.spark.sql.types._
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil))
val newSchema = 
  StructType(
    StructField("uniqueId", IntegerType) +: jsonData.schema.fields)

val augmentedRows = jsonData.rdd.zipWithUniqueId.map { 
  case (row, id) =>
    Row.fromSeq(id +: row.toSeq)
}

val newDF = sqlContext.createDataFrame(augmentedRows, newSchema)
Run Code Online (Sandbox Code Playgroud)