如何在spark scala中计算数据帧的大小

Yaz*_*iya 2 apache-spark spark-streaming apache-spark-sql

我想用重新分区编写一个大型数据帧,所以我想计算源数据帧的重新分区数。

numberofpartition= {数据帧大小/default_blocksize}

所以请告诉我如何在 spark scala 中计算数据帧的大小

提前致谢。

Sri*_*vas 8

使用spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes我们可以在加载到内存后获取实际 Dataframe 的大小,例如您可以查看以下代码。

scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]

scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils

scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709

scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB

scala> import sys.process._
import sys.process._

scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r-----   3 svcmxns hdfs          0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r-----   3 svcmxns hdfs    727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0

Run Code Online (Sandbox Code Playgroud)

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    val dataSize = bytes.toLong
    val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt // May be you can change or modify this to get required partitions.

    df.repartition(if(numPartitions == 0) 1 else numPartitions)
      .[...]

Run Code Online (Sandbox Code Playgroud)

Edit - 1 :请根据您的火花版本使用以下逻辑。

火花2.4

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes

Run Code Online (Sandbox Code Playgroud)

火花2.3

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes

Run Code Online (Sandbox Code Playgroud)

对于 Python

spark._jsparkSession.sessionState().executePlan(df._jdf.queryExecution().logical()).optimizedPlan().stats().sizeInBytes()

Run Code Online (Sandbox Code Playgroud)

  • 对我来说,这条指令:`spark.sessionState.executePlan(df.queryExecution.logic).optimizedPlan.stats.sizeInBytes`,对于一个小数据集返回太大的数字:`Statistics(sizeInBytes=8.11E+236 B,hints=无)`...有什么提示/建议吗?谢谢 (4认同)