我正在尝试比较不同的方式来聚合我的数据.
这是我的输入数据,包含2个元素(页面,访问者):
(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
Run Code Online (Sandbox Code Playgroud)
使用以下代码将SQL命令用于Spark SQL:
import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
"""select page
,count(distinct visitor) as visitor
from logs
group by page
""")
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
Run Code Online (Sandbox Code Playgroud)
我得到这个输出:
(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
Run Code Online (Sandbox Code Playgroud)
现在,我想使用Dataframes和他们的API获得相同的结果,但我无法获得相同的输出:
import sqlContext.implicits._
case class Log(page: String, visitor: …Run Code Online (Sandbox Code Playgroud) 我是Scala/Spark堆栈的新手,我正试图弄清楚如何使用SparkSql来测试我的基本技能,以便在TempTables中"映射"RDD,反之亦然.
我有两个不同的.scala文件,具有相同的代码:一个简单的对象(使用def main ...)和一个扩展App的对象.
在简单的对象中,我得到一个错误,因为"No TypeTag available"连接到我的case类Log:
object counter {
def main(args: Array[String]) {
.
.
.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
case class Log(visitatore: String, data: java.util.Date, pagina: String, count: Int)
val log = triple.map(p => Log(p._1,p._2,p._3,p._4))
log.registerTempTable("logs")
val logSessioni= sqlContext.sql("SELECT visitor, data, pagina, count FROM logs")
logSessioni.foreach(println)
}
Run Code Online (Sandbox Code Playgroud)
行中的错误:log.registerTempTable("logs")表示"没有TypeTag可用于日志".
在另一个文件(对象扩展App)中一切正常:
object counterApp extends App {
.
.
.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
case class Log(visitatore: String, data: java.util.Date, pagina: String, count: …Run Code Online (Sandbox Code Playgroud)