Spark:如何在Dataframe API中翻译count(distinct(value))

Fab*_*oni 27 count distinct dataframe apache-spark apache-spark-sql

我正在尝试比较不同的方式来聚合我的数据.

这是我的输入数据,包含2个元素(页面,访问者):

(PAG1,V1)
(PAG1,V1)
(PAG2,V1)
(PAG2,V2)
(PAG2,V1)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG1,V2)
(PAG1,V1)
(PAG2,V2)
(PAG1,V3)
Run Code Online (Sandbox Code Playgroud)

使用以下代码将SQL命令用于Spark SQL:

import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Log(p._1,p._2)).toDF()
logs.registerTempTable("logs")
val sqlResult= sqlContext.sql(
                              """select page
                                       ,count(distinct visitor) as visitor
                                   from logs
                               group by page
                              """)
val result = sqlResult.map(x=>(x(0).toString,x(1).toString))
result.foreach(println)
Run Code Online (Sandbox Code Playgroud)

我得到这个输出:

(PAG1,3) // PAG1 has been visited by 3 different visitors
(PAG2,2) // PAG2 has been visited by 2 different visitors
Run Code Online (Sandbox Code Playgroud)

现在,我想使用Dataframes和他们的API获得相同的结果,但我无法获得相同的输出:

import sqlContext.implicits._
case class Log(page: String, visitor: String)
val logs = data.map(p => Coppia(p._1,p._2)).toDF()
val result = log.select("page","visitor").groupBy("page").count().distinct
result.foreach(println)
Run Code Online (Sandbox Code Playgroud)

事实上,这就是我得到的输出:

[PAG1,8]  // just the simple page count for every page
[PAG2,4]
Run Code Online (Sandbox Code Playgroud)

这可能是愚蠢的,但我现在看不到它.

提前致谢!

FF

yjs*_*hen 52

你需要的是DataFrame聚合函数countDistinct:

import sqlContext.implicits._
import org.apache.spark.sql.functions._

case class Log(page: String, visitor: String)

val logs = data.map(p => Log(p._1,p._2))
            .toDF()

val result = logs.select("page","visitor")
            .groupBy('page)
            .agg('page, countDistinct('visitor))

result.foreach(println)
Run Code Online (Sandbox Code Playgroud)

  • 我收到此错误 - >未找到:value countDistinct (2认同)
  • 这是`org.apache.spark.sql.functions`中的一个方法,导入它:),编辑完成。 (2认同)
  • @Panto,您是否在功能后添加了下划线?`org.apache.spark.sql.functions._` (2认同)

Abu*_*oeb 5

您可以使用 dataframe 的groupBy命令两次来执行此操作。这df1是您的原始输入。

val df2 = df1.groupBy($"page",$"visitor").agg(count($"visitor").as("count"))
Run Code Online (Sandbox Code Playgroud)

该命令将产生以下结果:

page  visitor  count
----  ------   ----
PAG2    V2       2
PAG1    V3       1
PAG1    V1       5
PAG1    V2       2
PAG2    V1       2
Run Code Online (Sandbox Code Playgroud)

然后再次使用该groupBy命令即可得到最终结果。

 df2.groupBy($"page").agg(count($"visitor").as("count"))
Run Code Online (Sandbox Code Playgroud)

最终输出:

page   count
----   ----
PAG1    3
PAG2    2
Run Code Online (Sandbox Code Playgroud)