use*_*018 13 scala apache-spark apache-spark-sql
我有多个模式,如下所示,具有不同的列名和数据类型.我想使用DataFrameScala为每个模式生成测试/模拟数据,并将其保存到镶木地板文件中.
下面是示例模式(来自示例json),以动态生成数据,其中包含虚拟值.
val schema1 = StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true)
StructField("p", LongType, true),
StructField("pp", StringType, true)
)
)
Run Code Online (Sandbox Code Playgroud)
我需要像这样的rdd/dataframe,每行1000个,基于上面模式中的列数.
val data = Seq(
Row(1d, "happy", 1L, "Iam"),
Row(2d, "sad", 2L, "Iam"),
Row(3d, "glad", 3L, "Iam")
)
Run Code Online (Sandbox Code Playgroud)
基本上......就像我需要动态生成数据的200个数据集一样,为每个方案编写单独的程序对我来说是不可能的.
PLS.帮助我你的想法或impl.因为我是新手.
是否可以根据不同类型的模式生成动态数据?
使用@ JacekLaskowski的建议,您可以使用基于您期望的字段/类型的ScalaCheck(Gen)生成器生成动态数据.
它可能看起来像这样:
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode}
import org.scalacheck._
import scala.collection.JavaConverters._
val dynamicValues: Map[(String, DataType), Gen[Any]] = Map(
("a", DoubleType) -> Gen.choose(0.0, 100.0),
("aa", StringType) -> Gen.oneOf("happy", "sad", "glad"),
("p", LongType) -> Gen.choose(0L, 10L),
("pp", StringType) -> Gen.oneOf("Iam", "You're")
)
val schemas = Map(
"schema1" -> StructType(
List(
StructField("a", DoubleType, true),
StructField("aa", StringType, true),
StructField("p", LongType, true),
StructField("pp", StringType, true)
)),
"schema2" -> StructType(
List(
StructField("a", DoubleType, true),
StructField("pp", StringType, true),
StructField("p", LongType, true)
)
)
)
val numRecords = 1000
schemas.foreach {
case (name, schema) =>
// create a data frame
spark.createDataFrame(
// of #numRecords records
(0 until numRecords).map { _ =>
// each of them a row
Row.fromSeq(schema.fields.map(field => {
// with fields based on the schema's fieldname & type else null
dynamicValues.get((field.name, field.dataType)).flatMap(_.sample).orNull
}))
}.asJava, schema)
// store to parquet
.write.mode(SaveMode.Overwrite).parquet(name)
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1062 次 |
| 最近记录: |