Lok*_*oki 5 scala apache-spark apache-spark-sql
使用spark保存记录时,有没有办法获取写入的记录数?虽然我知道目前不符合规范,但我希望能够做到这样的事情:
val count = df.write.csv(path)
Run Code Online (Sandbox Code Playgroud)
或者,能够进行步骤结果的内联计数(优选地不使用标准累加器)将(几乎)同样有效.即:
dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?
我使用SparkListener它可以拦截onTaskEnd或onStageCompleted用于访问任务指标的事件.
任务指标为您提供Spark用于在SQL选项卡中显示指标的累加器(在查询的详细信息中).
例如,以下查询:
spark.
read.
option("header", true).
csv("../datasets/people.csv").
limit(10).
write.
csv("people")
Run Code Online (Sandbox Code Playgroud)
给出了10个输出行,因此Spark知道它(你也可以).
您还可以探索Spark SQL的QueryExecutionListener:
查询执行侦听器的接口,可用于分析执行度量标准.
您可以注册QueryExecutionListener使用ExecutionListenerManager可用的spark.listenerManager.
scala> :type spark.listenerManager
org.apache.spark.sql.util.ExecutionListenerManager
scala> spark.listenerManager.
clear clone register unregister
Run Code Online (Sandbox Code Playgroud)
我认为它更接近"裸机",但之前没有使用过.
@ D3V(在评论部分)提到numOutputRows使用QueryExecution结构化查询访问SQL指标.值得考虑的事情.
scala> :type q
org.apache.spark.sql.DataFrame
scala> :type q.queryExecution.executedPlan.metrics
Map[String,org.apache.spark.sql.execution.metric.SQLMetric]
q.queryExecution.executedPlan.metrics("numOutputRows").value
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2106 次 |
| 最近记录: |