Pau*_*rco 5 scala apache-spark apache-spark-sql
我正在 Spark DataFrame 上应用许多转换(过滤器、groupBy、连接)。我想要每次转换后 DataFrame 中的行数。
我目前正在每次转换后使用函数 count() 来计算行数,但这每次都会触发一个操作,而这并没有真正优化。
我想知道是否有任何方法可以知道行数,而不必触发原始作业以外的其他操作。
您可以为每个阶段使用累加器,并在每个阶段之后递增映射中的累加器。然后,在您执行操作后,您将获得所有阶段的计数。
val filterCounter = spark.sparkContext.longAccumulator("filter-counter")
val groupByCounter = spark.sparkContext.longAccumulator("group-counter")
val joinCounter = spark.sparkContext.longAccumulator("join-counter")
myDataFrame
.filter(col("x") === lit(3))
.map(x => {
filterCounter.add(1)
x
}) .groupBy(col("x"))
.agg(max("y"))
.map(x => {
groupByCounter.add(1)
x
})
.join(myOtherDataframe, col("x") === col("y"))
.map(x => {
joinCounter.add(1)
x
})
.count()
print(s"count for filter = ${filterCounter.value}")
print(s"count for group by = ${groupByCounter.value}")
print(s"count for join = ${joinCounter.value}")
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4470 次 |
| 最近记录: |