RDD CountApproximate比请求的超时时间长得多

jav*_*dba 5 scala apache-spark

为了减少收集行所花费的时间count,正在调用它.它有以下签名:DataFrameRDD.countApproximate()

def countApprox(
    timeout: Long,
    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
Run Code Online (Sandbox Code Playgroud)

我试图将输出计算限制为60秒.另请注意:精度要求非常0.10:

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean
Run Code Online (Sandbox Code Playgroud)

但实际时间是...... 17分钟

在此输入图像描述

那个时间几乎与首先生成数据所需的时间(19分钟)相同!

这样的话-什么是使用这个API的:有什么办法让它真正保存的一些有意义的部分确切时间如何计算?

TL; DR(参见接受的答案):使用initialValue而不是getFinalValue

had*_*per 7

请注意approxCount定义中的返回类型.这是部分结果.

def countApprox(
    timeout: Long,
    confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
Run Code Online (Sandbox Code Playgroud)

现在,请注意它的使用方法:

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).**getFinalValue**.mean
Run Code Online (Sandbox Code Playgroud)

根据spark scala doc,getFinalValue阻塞方法意味着它将等待完成操作完成.

initialValue可以在指定的超时内获取.所以下面的代码片段在超时后不会阻止进一步的操作,

val waitSecs = 60
val cnt = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean
Run Code Online (Sandbox Code Playgroud)

请注意使用的缺点countApprox(timeout, confidence).initialValue是,即使在获得该值之后,它仍将继续计数,直到它获得您将获得的最终计数,getFinalValue并且仍将保留资源直到操作完成.

现在使用这个api不会在计数操作中被阻止.

参考:https://mail-archives.apache.org/mod_mbox/spark-user/201505.mbox/%3C747872034.1520543.1431544429083.JavaMail.yahoo@mail.yahoo.com%3E

现在让我们验证我们对spark2-shell的非阻塞操作的假设.让我们创建随机数据框中并执行count,approxCountgetFinalValueapproxCountinitialValue:

scala> val schema = StructType((0 to 10).map(n => StructField(s"column_$n", StringType)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(column_0,StringType,true), StructField(column_1,StringType,true), StructField(column_2,StringType,true), StructField(column_3,StringType,true), StructField(column_4,StringType,true), StructField(column_5,StringType,true), StructField(column_6,StringType,true), StructField(column_7,StringType,true), StructField(column_8,StringType,true), StructField(column_9,StringType,true), StructField(column_10,StringType,true))

scala> val rows = spark.sparkContext.parallelize(Seq[Row](), 100).mapPartitions { _ => { Range(0, 100000).map(m => Row(schema.map(_ => Random.alphanumeric.filter(_.isLower).head.toString).toList: _*)).iterator } }
rows: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] at mapPartitions at <console>:32

scala> val inputDf = spark.sqlContext.createDataFrame(rows, schema)
inputDf: org.apache.spark.sql.DataFrame = [column_0: string, column_1: string ... 9 more fields]

//Please note that cnt will be displayed only when all tasks are completed
scala> val cnt = inputDf.rdd.count
cnt: Long = 10000000

scala> val waitSecs = 60
waitSecs: Int = 60

//cntApproxFinal will be displayed only when all tasks are completed.
scala> val cntApprxFinal = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).getFinalValue.mean
[Stage 1:======================================================> (98 + 2) / 100]cntApprxFinal: Double = 1.0E7

scala> val waitSecs = 60
waitSecs: Int = 60

//Please note that cntApprxInitila in this case, will be displayed exactly after timeout duration. In this case 80 tasks were completed within timeout and it displayed the value of variable. Even after displaying the variable value, it continued will all the remaining tasks
scala> val cntApprxInitial = inputDf.rdd.countApprox(waitSecs * 1000, 0.10).initialValue.mean
[Stage 2:============================================>           (80 + 4) / 100]cntApprxInitial: Double = 1.0E7

[Stage 2:=======================================================>(99 + 1) / 100]
Run Code Online (Sandbox Code Playgroud)

让我们来看看spark ui和spark-shell,所有3个操作同时进行: 所有3个操作都花了相同的时间

cntApprxInitial 在完成所有任务之前可用.

cntApprxInitial在完成所有任务之前可用

希望这可以帮助!