小编the*_*tch的帖子

计算Spark DataFrame中分组数据的标准差

我有一个用户日志,我从csv获取并转换为DataFrame,以利用SparkSQL查询功能.单个用户每小时会创建大量条目,我想为每个用户收集一些基本的统计信息; 实际上只是用户实例的数量,平均值以及众多列的标准偏差.我能够通过使用groupBy($"user")以及用于count和avg的SparkSQL函数的聚合器来快速获取均值和计数信息:

val meanData = selectedData.groupBy($"user").agg(count($"logOn"),
avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"),
avg($"repliesPerHour"), avg($"duration"))
Run Code Online (Sandbox Code Playgroud)

但是,我似乎无法找到同样优雅的方法来计算标准偏差.到目前为止,我只能通过映射字符串,双对并使用StatCounter()来计算它.stdev实用程序:

val stdevduration = duration.groupByKey().mapValues(value =>
org.apache.spark.util.StatCounter(value).stdev)
Run Code Online (Sandbox Code Playgroud)

然而,这会返回一个RDD,我想尝试将其全部保存在DataFrame中,以便对返回的数据进行进一步的查询.

scala apache-spark apache-spark-sql

15
推荐指数
2
解决办法
3万
查看次数

Akka SLF4J和Scala中的logback

我正在尝试为我的akka​​ actor系统设置一些基本的日志记录,但到目前为止我只获取标准日志而不是我添加的日志或输出文件.我已经跟踪了用于日志记录akka文档,并设置了以下内容:

  • 我将这些依赖项添加到build.sbt文件中

    "com.typesafe.akka" %% "akka-slf4j" % "2.3.14"
    "ch.qos.logback" % "logback-classic" % "1.0.9"
    
    Run Code Online (Sandbox Code Playgroud)
  • 我将其添加到application.conf文件中

    akka {
        loggers = ["akka.event.slf4j.Slf4jLogger"]
        loglevel = "DEBUG"
    }  
    
    Run Code Online (Sandbox Code Playgroud)
  • logback.xml位于src/main/resources中

    <configuration>
        <appender name="FILE" class="ch.qos.logback.core.FileAppender">
            <File>./logs/akka.log</File>
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%-5level] %msg%n</pattern>
            </encoder>
        </appender>
        <root level="info">
            <appender-ref ref="FILE" />
        </root>
    </configuration>
    
    Run Code Online (Sandbox Code Playgroud)
  • 这就是我跳跃应该做的日志记录

    import akka.event.Logging
    
    val log = Logging(context.system, classOf[TickActor])
    log.info("Good Luck!")
    
    Run Code Online (Sandbox Code Playgroud)

我没有收到标准日志记录中的失败消息,也没有找到与我已有的解决方案有很大不同的其他解决方案.我已经尝试过这个问题的建议.这似乎是我遇到的同样问题,但这些建议没有用.我错过了一步或配置错了吗?

logging scala logback slf4j akka

7
推荐指数
1
解决办法
2856
查看次数

如何计算由 Spark 中的 (Key, [Value]) 对组成的 RDD 中每对的平均值?

我对 Scala 和 Spark 都很陌生,所以如果我完全错误地解决了这个问题,请原谅我。导入一个csv文件,过滤,映射后;我有一个 RDD,它是一堆 (String, Double) 对。

(b2aff711,-0.00510)
(ae095138,0.20321)
(etc.)
Run Code Online (Sandbox Code Playgroud)

当我在 RDD 上使用 .groupByKey() 时,

val grouped = rdd1.groupByKey()
Run Code Online (Sandbox Code Playgroud)

获得带有一堆 (String, [Double]) 对的 RDD。(我不知道 CompactBuffer 是什么意思,也许会导致我的问题?)

(32540b03,CompactBuffer(-0.00699, 0.256023))
(a93dec11,CompactBuffer(0.00624))
(32cc6532,CompactBuffer(0.02337, -0.05223, -0.03591))
(etc.)
Run Code Online (Sandbox Code Playgroud)

一旦它们被分组,我就会尝试取平均值和标准偏差。我想简单地使用 .mean() 和 .sampleStdev()。当我尝试创建一个新的 RDD 方法时,

val mean = grouped.mean()
Run Code Online (Sandbox Code Playgroud)

返回错误

错误:(51, 22) value mean 不是 org.apache.spark.rdd.RDD[(String, Iterable[Double])] 的成员

val mean = grouped.mean()

我已经导入了 org.apache.spark.SparkContext._
我也试过使用 sampleStdev()、.sum()、.stats() 得到相同的结果。不管是什么问题,它似乎都会影响到所有的数字 RDD 操作。

scala apache-spark

5
推荐指数
1
解决办法
7962
查看次数

标签 统计

scala ×3

apache-spark ×2

akka ×1

apache-spark-sql ×1

logback ×1

logging ×1

slf4j ×1