arj*_*nes 6 apache-spark apache-spark-sql
我正在使用新的Apache Spark版本1.4.0数据帧API从Twitter的状态JSON中提取信息,主要集中在实体对象上 - 此问题的相关部分如下所示:
{
...
...
"entities": {
"hashtags": [],
"trends": [],
"urls": [],
"user_mentions": [
{
"screen_name": "linobocchini",
"name": "Lino Bocchini",
"id": 187356243,
"id_str": "187356243",
"indices": [ 3, 16 ]
},
{
"screen_name": "jeanwyllys_real",
"name": "Jean Wyllys",
"id": 111123176,
"id_str": "111123176",
"indices": [ 79, 95 ]
}
],
"symbols": []
},
...
...
}
Run Code Online (Sandbox Code Playgroud)
对于如何从基元类型作为提取信息的几个例子string,integer等等-但我无法找到如何处理这些类型的任何复杂的结构.
我尝试了下面的代码,但它仍然无法正常工作,它会引发异常
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val tweets = sqlContext.read.json("tweets.json")
// this function is just to filter empty entities.user_mentions[] nodes
// some tweets doesn't contains any mentions
import org.apache.spark.sql.functions.udf
val isEmpty = udf((value: List[Any]) => value.isEmpty)
import org.apache.spark.sql._
import sqlContext.implicits._
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String)
val mentions = tweets.select("entities.user_mentions").
filter(!isEmpty($"user_mentions")).
explode($"user_mentions") {
case Row(arr: Array[Row]) => arr.map { elem =>
UserMention(
elem.getAs[Long]("id"),
elem.getAs[String]("is_str"),
elem.getAs[Array[Long]]("indices"),
elem.getAs[String]("name"),
elem.getAs[String]("screen_name"))
}
}
mentions.first
Run Code Online (Sandbox Code Playgroud)
我试图打电话时出现例外情况mentions.first:
scala> mentions.first
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)
Run Code Online (Sandbox Code Playgroud)
作为附加上下文,自动映射的结构是:
scala> mentions.printSchema
root
|-- user_mentions: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: long (nullable = true)
| | |-- id_str: string (nullable = true)
| | |-- indices: array (nullable = true)
| | | |-- element: long (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- screen_name: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
注1:我知道有可能使用它来解决这个问题,HiveQL但是一旦有很多动力,我想使用数据帧.
SELECT explode(entities.user_mentions) as mentions
FROM tweets
Run Code Online (Sandbox Code Playgroud)
注2:该UDF val isEmpty = udf((value: List[Any]) => value.isEmpty)是一个丑陋的黑客,我在这里失去了一些东西,但我想出了避免NPE的唯一途径
小智 4
这是一个有效的解决方案,只需一个小技巧即可。
主要思想是通过声明 List[String] 而不是 List[Row] 来解决类型问题:
val mentions = tweets.explode("entities.user_mentions", "mention"){m: List[String] => m}
Run Code Online (Sandbox Code Playgroud)
这将创建第二个名为“mention”、类型为“Struct”的列:
| entities| mention|
+--------------------+--------------------+
|[List(),List(),Li...|[187356243,187356...|
|[List(),List(),Li...|[111123176,111123...|
Run Code Online (Sandbox Code Playgroud)
现在执行一个map()来提取mention中的字段。getStruct(1) 调用获取每行第 1 列中的值:
case class Mention(id: Long, id_str: String, indices: Seq[Int], name: String, screen_name: String)
val mentionsRdd = mentions.map(
row =>
{
val mention = row.getStruct(1)
Mention(mention.getLong(0), mention.getString(1), mention.getSeq[Int](2), mention.getString(3), mention.getString(4))
}
)
Run Code Online (Sandbox Code Playgroud)
并将 RDD 转换回 DataFrame:
val mentionsDf = mentionsRdd.toDF()
Run Code Online (Sandbox Code Playgroud)
就这样吧!
| id| id_str| indices| name| screen_name|
+---------+---------+------------+-------------+---------------+
|187356243|187356243| List(3, 16)|Lino Bocchini| linobocchini|
|111123176|111123176|List(79, 95)| Jean Wyllys|jeanwyllys_real|
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3148 次 |
| 最近记录: |