为什么Spark Row对象与等效结构相比如此之大?

Dav*_*fin 9 apache-spark

我一直在玩这个java-sizeof库(https://github.com/phatak-dev/java-sizeof)并用它来测量Apache Spark中的数据集大小.事实证明,这个Row物体非常大.就像非常大 - 为什么会这样?

采取一个相当简单的架构:

root
 |-- account: string (nullable = true)
 |-- date: long (nullable = true)
 |-- dialed: string (nullable = true)
 |-- duration: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

示例数据如下所示:

+-------+-------------+----------+--------+
|account|         date|    dialed|duration|
+-------+-------------+----------+--------+
|   5497|1434620384003|9075112643|   790.0|
+-------+-------------+----------+--------+
Run Code Online (Sandbox Code Playgroud)

所以现在我们做:

val row = df.take(1)(0)
// row: org.apache.spark.sql.Row = [5497,1434620384003,9075112643,790.0]
Run Code Online (Sandbox Code Playgroud)

所以现在我用 SizeEstimator

SizeEstimator.estimate(row)
// res19: Long = 85050896
Run Code Online (Sandbox Code Playgroud)

81兆字节!对于单排!认为这是某种错误,我这样做:

SizeEstimator.estimate(df.take(100))
// res20: Long = 85072696
Run Code Online (Sandbox Code Playgroud)

有趣的是,尽管拥有100倍的数据量,但它并没有大得多 - 只有大约20k.高于100,似乎是线性的.对于1,000行,它看起来像这样:

SizeEstimator.estimate(df.take(1000))
// res21: Long = 850711696
Run Code Online (Sandbox Code Playgroud)

好的,所以它比100行大10倍 - 或多或少是线性的.从测试中,它以线性方式增加,持续超过100行.基于这些测试,在大约100行之后,每行对象的成本仍然超过800 KB!

出于好奇,我为相同的底层数据尝试了几种不同的对象类型.例如,这里有一个结果ArrayArray对象而不是Row对象:

SizeEstimator.estimate(
  df.map(r => (r.getString(0), r.getLong(1), r.getString(2), r.getDouble(3))).take(1)
)
// res22: Long = 216
Run Code Online (Sandbox Code Playgroud)

好的,那好一点.更好的是,对于10行,它只有1976字节,对于100行,它只有19,616字节.绝对朝着正确的方向前进.

然后,我编码相同DataFrameRDD[Array[Byte]]地方,每个Array[Byte]都是二进制编码的Avro记录,具有与底层相同的模式DataFrame.然后我做:

SizeEstimator.estimate(encodedRdd.take(1))
// res23: Long = 72
Run Code Online (Sandbox Code Playgroud)

72字节 - 甚至更好!并且,对于100行,它是5,216个字节 - 每行约52个字节,并且它从那里继续向下(对于1,000个记录为48,656个字节).

因此,在最好的情况下,Row对象的重量为每个850k Row,而Avro相同数据的二进制记录大约为50个字节.

到底是怎么回事??

zer*_*323 5

实际上Row它本身并不是那么大.这就是为什么当您占用更多行时,您看不到大小的重大变化.问题似乎是架构信息:

  1. 当您收集实际获得的数据时 GenericRowWithSchema

    val df = Seq((1, "foo"), (2, "bar")).toDF
    df.first.getClass
    
    // res12: Class[_ <: org.apache.spark.sql.Row] = 
    //   class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    
    Run Code Online (Sandbox Code Playgroud)
  2. GenericRowWithSchemaschema参数传递架构信息:

    class GenericRowWithSchema(values: Array[Any], 
      override val schema: StructType)
    
    Run Code Online (Sandbox Code Playgroud)
  3. 让我们确认这确实是问题的根源:

    import com.madhukaraphatak.sizeof.SizeEstimator
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    
    val rowWithSchema = df.first 
    val rowWithoutSchema = new GenericRowWithSchema(
      rowWithSchema.toSeq.toArray, null)
    
    SizeEstimator.estimate(rowWithSchema)
    // Long = 1444255708
    
    SizeEstimator.estimate(rowWithoutSchema)
    // Long = 120
    
    Run Code Online (Sandbox Code Playgroud)
  4. 假设:您看到的估计大小包括模式的大小:

    SizeEstimator.estimate(df.schema)
    // Long = 1444361928
    
    Run Code Online (Sandbox Code Playgroud)

    这与收集的行大致相同.让我们从头开始创建一个新架构:

    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("_1",IntegerType,false), 
      StructField("_2",StringType,true)))
    
    
    val anotherRowWithSchema = new GenericRowWithSchema(
      Array(0, "foo"), schema) 
    
    SizeEstimator.estimate(anotherRowWithSchema)
    // Long = 1444905324
    
    Run Code Online (Sandbox Code Playgroud)

    所以你可以看到结果是一致的.

  5. 为什么架构如此之大?很难说.当您查看代码时,您会看到这 StructType是一个复杂的类,甚至不包括其伴随对象,而不是简单的模式定义.

    但它没有解释报告的大小.我怀疑这可能是一些侥幸,SizeEstimator但我还不确定.

  6. 您可以进一步隔离问题但估计单个大小StructField:

    import org.apache.spark.sql.types._
    import com.madhukaraphatak.sizeof.SizeEstimator
    
    object App {
      def main(args: Array[String]) {
        val schema = StructField("foo", IntegerType, true)
        println(SizeEstimator.estimate(schema))
        // 271872172
      }
    }
    
    Run Code Online (Sandbox Code Playgroud)

  • 我个人不会首先在Rows上操作.结构不是很优雅.但除此之外 - 打败了我.我将不得不深入挖掘,但拖延所有这些看起来并不是一个好主意. (2认同)