如何基于模式动态生成数据集?

use*_*018 13 scala apache-spark apache-spark-sql

我有多个模式,如下所示,具有不同的列名和数据类型.我想使用DataFrameScala为每个模式生成测试/模拟数据,并将其保存到镶木地板文件中.

下面是示例模式(来自示例json),以动态生成数据,其中包含虚拟值.

val schema1 = StructType(
  List(
    StructField("a", DoubleType, true),
    StructField("aa", StringType, true)
    StructField("p", LongType, true),
    StructField("pp", StringType, true)
  )
)
Run Code Online (Sandbox Code Playgroud)

我需要像这样的rdd/dataframe,每行1000个,基于上面模式中的列数.

val data = Seq(
  Row(1d, "happy", 1L, "Iam"),
  Row(2d, "sad", 2L, "Iam"),
  Row(3d, "glad", 3L, "Iam")
)
Run Code Online (Sandbox Code Playgroud)

基本上......就像我需要动态生成数据的200个数据集一样,为每个方案编写单独的程序对我来说是不可能的.

PLS.帮助我你的想法或impl.因为我是新手.

是否可以根据不同类型的模式生成动态数据?

Tom*_*ous 9

使用@ JacekLaskowski的建议,您可以使用基于您期望的字段/类型的ScalaCheck(Gen)生成器生成动态数据.

它可能看起来像这样:

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode}
import org.scalacheck._

import scala.collection.JavaConverters._

val dynamicValues: Map[(String, DataType), Gen[Any]] = Map(
  ("a", DoubleType) -> Gen.choose(0.0, 100.0),
  ("aa", StringType) -> Gen.oneOf("happy", "sad", "glad"),
  ("p", LongType) -> Gen.choose(0L, 10L),
  ("pp", StringType) -> Gen.oneOf("Iam", "You're")
)

val schemas = Map(
  "schema1" -> StructType(
    List(
      StructField("a", DoubleType, true),
      StructField("aa", StringType, true),
      StructField("p", LongType, true),
      StructField("pp", StringType, true)
    )),
  "schema2" -> StructType(
    List(
      StructField("a", DoubleType, true),
      StructField("pp", StringType, true),
      StructField("p", LongType, true)
    )
  )
)

val numRecords = 1000

schemas.foreach {
  case (name, schema) =>
    // create a data frame
    spark.createDataFrame(
      // of #numRecords records
      (0 until numRecords).map { _ =>
        // each of them a row
        Row.fromSeq(schema.fields.map(field => {
          // with fields based on the schema's fieldname & type else null
          dynamicValues.get((field.name, field.dataType)).flatMap(_.sample).orNull
        }))
      }.asJava, schema)
      // store to parquet
      .write.mode(SaveMode.Overwrite).parquet(name)
}
Run Code Online (Sandbox Code Playgroud)