rab*_*nnh 1 scala dataframe apache-spark
这感觉有点傻,但我从Spark 1.6.1迁移到Spark 2.0.2.我正在使用Databrick CSV库,现在正尝试使用内置CSV DataFrameWriter.
以下代码:
// Get an SQLContext
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
var sTS = lTimestampToSummarize.toString()
val sS3InputPath = "s3://measurements/" + sTS + "/*"
// Read all measurements - note that all subsequent ETLs will reuse dfRaw
val dfRaw = sqlContext.read.json(sS3InputPath)
// Filter just the user/segment timespent records
val dfSegments = dfRaw.filter("segment_ts <> 0").withColumn("views", lit(1))
// Aggregate views and timespent per user/segment tuples
val dfUserSegments : DataFrame = dfSegments.groupBy("company_id", "division_id", "department_id", "course_id", "user_id", "segment_id")
.agg(sum("segment_ts").alias("segment_ts_sum"), sum("segment_est").alias("segment_est_sum"), sum("views").alias("segment_views"))
// The following will write CSV files to the S3 bucket
val sS3Output = "s3://output/" + sTS + "/usersegment/"
dfUserSegments.write.csv(sS3Output)
Run Code Online (Sandbox Code Playgroud)
返回此错误:
[error] /home/Spark/src/main/scala/Example.scala:75: type mismatch;
[error] found : Unit
[error] required: org.apache.spark.sql.DataFrame
[error] (which expands to) org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
[error] dfUserSegments.write.csv(sS3Output)
[error] ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 2 s, completed Jun 5, 2017 5:00:12 PM
Run Code Online (Sandbox Code Playgroud)
我知道我必须解释错误,因为我设置dfUserSegments为显式为a DataFrame,但编译器告诉我它是类型Unit(没有类型).
任何帮助表示赞赏.
您没有显示整个方法.我想这是因为方法返回类型是DataFrame,但是这个方法中的最后一个语句是dfUserSegments.write.csv(sS3Output),并且csv返回类型是Unit.
| 归档时间: |
|
| 查看次数: |
224 次 |
| 最近记录: |