我试图以编程方式在textFile上强制执行schema(json),它看起来像json.我尝试使用jsonFile,但问题是从json文件列表创建数据帧,spark必须通过数据传递1次来为数据帧创建模式.因此,它需要解析所有需要更长时间的数据(自我的数据压缩后4小时以及TB的大小).因此,我想尝试将其作为textFile读取并强制执行模式以单独获取感兴趣的字段,以便稍后查询结果数据框.但我不知道如何将其映射到输入.有些人可以给我一些参考,如何将模式映射到json,如输入.
输入:
这是完整的架构:
records: org.apache.spark.sql.DataFrame = [country: string, countryFeatures: string, customerId: string, homeCountry: string, homeCountryFeatures: string, places: array<struct<freeTrial:boolean,placeId:string,placeRating:bigint>>, siteName: string, siteId: string, siteTypeId: string, Timestamp: bigint, Timezone: string, countryId: string, pageId: string, homeId: string, pageType: string, model: string, requestId: string, sessionId: string, inputs: array<struct<inputName:string,inputType:string,inputId:string,offerType:string,originalRating:bigint,processed:boolean,rating:bigint,score:double,methodId:string>>]
Run Code Online (Sandbox Code Playgroud)
但我只对以下几个领域感兴趣:
res45: Array[String] = Array({"requestId":"bnjinmm","siteName":"bueller","pageType":"ad","model":"prepare","inputs":[{"methodId":"436136582","inputType":"US","processed":true,"rating":0,"originalRating":1},{"methodId":"23232322","inputType":"UK","processed":falase,"rating":0,"originalRating":1}]
val records = sc.textFile("s3://testData/sample.json.gz")
val schema = StructType(Array(StructField("requestId",StringType,true),
StructField("siteName",StringType,true),
StructField("model",StringType,true),
StructField("pageType",StringType,true),
StructField("inputs", ArrayType(
StructType(
StructField("inputType",StringType,true),
StructField("originalRating",LongType,true),
StructField("processed",BooleanType,true),
StructField("rating",LongType,true),
StructField("methodId",StringType,true)
),true),true)))
val rowRDD = ??
val inputRDD = sqlContext.applySchema(rowRDD, schema)
inputRDD.registerTempTable("input")
sql("select * from input").foreach(println)
Run Code Online (Sandbox Code Playgroud)
有没有办法映射这个?或者我需要使用儿子解析器或其他东西.我只想因为约束而使用textFile.
试过:
val records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")
Run Code Online (Sandbox Code Playgroud)
但是不断得到错误:
<console>:37: error: overloaded method value apply with alternatives:
(fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
(fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
StructField("inputs",ArrayType(StructType(StructField("inputType",StringType,true), StructField("originalRating",LongType,true), StructField("processed",BooleanType,true), StructField("rating",LongType,true), StructField("score",DoubleType,true), StructField("methodId",StringType,true)),true),true)))
^
Run Code Online (Sandbox Code Playgroud)
它可以使用预定义模式加载以下代码,spark不需要通过ZIP文件中的文件.问题中的代码含糊不清.
import org.apache.spark.sql.types._
val input = StructType(
Array(
StructField("inputType",StringType,true),
StructField("originalRating",LongType,true),
StructField("processed",BooleanType,true),
StructField("rating",LongType,true),
StructField("score",DoubleType,true),
StructField("methodId",StringType,true)
)
)
val schema = StructType(Array(
StructField("requestId",StringType,true),
StructField("siteName",StringType,true),
StructField("model",StringType,true),
StructField("inputs",
ArrayType(input,true),
true)
)
)
val records =sqlContext.read.schema(schema).json("s3://testData/test2.gz")
Run Code Online (Sandbox Code Playgroud)
并非所有字段都需要提供.虽然如果可能的话提供所有的东西是好的.
如果某行无效,Spark最好解析所有.它会将_corrupt_record添加为包含整行的列.如果它是jnes文件文件.
| 归档时间: |
|
| 查看次数: |
11029 次 |
| 最近记录: |