raj*_*mar 9 scala dataframe apache-spark apache-spark-sql
sql/dataframes,请帮帮我或提供一些关于如何阅读这个json的好建议
{
"billdate":"2016-08-08',
"accountid":"xxx"
"accountdetails":{
"total":"1.1"
"category":[
{
"desc":"one",
"currentinfo":{
"value":"10"
},
"subcategory":[
{
"categoryDesc":"sub",
"value":"10",
"currentinfo":{
"value":"10"
}
}]
}]
}
}
Run Code Online (Sandbox Code Playgroud)
谢谢,
小智 11
您可以尝试以下代码来基于Spark 2.2中的Schema读取JSON文件
import org.apache.spark.sql.types.{DataType, StructType}
//Read Json Schema and Create Schema_Json
val schema_json=spark.read.json("/user/Files/ActualJson.json").schema.json
//add the schema
val newSchema=DataType.fromJson(schema_json).asInstanceOf[StructType]
//read the json files based on schema
val df=spark.read.schema(newSchema).json("Json_Files/Folder Path")
Run Code Online (Sandbox Code Playgroud)
好像你的json无效.请查看http://www.jsoneditoronline.org/
请参阅介绍到json-support-in-spark-sql.html
如果你想注册为表,你可以像下面那样注册并打印架构.
DataFrame df = sqlContext.read().json("/path/to/validjsonfile").toDF();
df.registerTempTable("df");
df.printSchema();
Run Code Online (Sandbox Code Playgroud)
以下是示例代码段
DataFrame app = df.select("toplevel");
app.registerTempTable("toplevel");
app.printSchema();
app.show();
DataFrame appName = app.select("toplevel.sublevel");
appName.registerTempTable("sublevel");
appName.printSchema();
appName.show();
Run Code Online (Sandbox Code Playgroud)
{"name":"Michael", "cities":["palo alto", "menlo park"], "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "cities":["santa cruz"], "schools":[{"sname":"ucsb", "year":2011}]}
{"name":"Justin", "cities":["portland"], "schools":[{"sname":"berkeley", "year":2014}]}
val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame
Run Code Online (Sandbox Code Playgroud)
val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])
names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)
Run Code Online (Sandbox Code Playgroud)
使用select()方法指定顶级字段,使用collect()将其收集到Array [Row]中,使用getString()方法访问每行内的列.
每个人都有一系列"城市".让我们展平这些数组并读出它们的所有元素.
val flattened = people.explode("cities", "city"){c: List[String] => c}
flattened: org.apache.spark.sql.DataFrame
val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]
allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)
Run Code Online (Sandbox Code Playgroud)
explode()方法将cities数组展开或展平为名为"city"的新列.然后我们使用select()来选择新列,使用collect()将其收集到Array [Row]中,并使用getString()来访问每行内的数据.
读出"学校"数据,这是一组嵌套的JSON对象.数组的每个元素都包含学校名称和年份:
val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]
val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]
schoolsArr.foreach(schools => {
schools.map(row => print(row.getString(0), row.getLong(1)))
print("\n")
})
(stanford,2010)(berkeley,2012)
(ucsb,2011)
(berkeley,2014)
Run Code Online (Sandbox Code Playgroud)
使用select()和collect()选择"学校"数组并将其收集到一个Array[Row].现在,每个"学校"数组都是类型List[Row],所以我们用这个getSeq[Row]()方法读出来.最后,我们可以通过呼叫getString()学校名称和学年来阅读每所学校的信息getLong().