Mat*_*yra 12 scala dataframe apache-spark apache-spark-sql
我有一排火花DF Seq[(String, String, String)]
.我正试图做某种flatMap
事情,但我做的任何尝试最终都会抛出
java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema无法强制转换为scala.Tuple3
我可以从DF中获取单行或多行就好了
df.map{ r => r.getSeq[Feature](1)}.first
Run Code Online (Sandbox Code Playgroud)
回报
Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....
Run Code Online (Sandbox Code Playgroud)
并且RDD的数据类型似乎是正确的.
org.apache.spark.rdd.RDD[Seq[(String, String, String)]]
df的架构是
root
|-- article_id: long (nullable = true)
|-- content_processed: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- lemma: string (nullable = true)
| | |-- pos_tag: string (nullable = true)
| | |-- ne_tag: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我知道这个问题与处理RDD行的spark sql有关,org.apache.spark.sql.Row
即使他们愚蠢地说它是一个Seq[(String, String, String)]
.有一个相关的问题(链接如下),但这个问题的答案对我不起作用.我也不熟悉火花来弄清楚如何将它变成一个有效的解决方案.
行Row[Seq[(String, String, String)]]
或者Row[(String, String, String)]
或者Seq[Row[(String, String, String)]]
甚至是更疯狂的行.
我正在尝试做类似的事情
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)
Run Code Online (Sandbox Code Playgroud)
这似乎有效,但实际上并没有
df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first
Run Code Online (Sandbox Code Playgroud)
抛出上述错误.那么我应该如何(例如)在每一行获得第二个元组的第一个元素?
另外为什么火花被设计为这样做,声称事物是一种类型似乎是愚蠢的,而实际上它不是也不能转换为声称的类型.
相关问题:GenericRowWithSchema异常将DataBuffer中的HashSet转换为来自Hive表的RDD中的HashSet
相关错误报告:http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type
zer*_*323 17
嗯,它并不声称它是一个元组.它声称它是一个struct
映射到Row
:
import org.apache.spark.sql.Row
case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])
val df = Seq(
Record(1L, Seq(
Feature("ancient", "jj", "o"),
Feature("olympia_greece", "nn", "location")
))
).toDF
val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))
Run Code Online (Sandbox Code Playgroud)
您将在Spark SQL编程指南中找到确切的映射规则.
由于Row
结构不完全漂亮,您可能希望将其映射到有用的东西:
content.map(_.map {
case Row(lemma: String, pos_tag: String, ne_tag: String) =>
(lemma, pos_tag, ne_tag)
})
Run Code Online (Sandbox Code Playgroud)
要么:
content.map(_.map ( row => (
row.getAs[String]("lemma"),
row.getAs[String]("pos_tag"),
row.getAs[String]("ne_tag")
)))
Run Code Online (Sandbox Code Playgroud)
最后一个稍微简洁的方法Datasets
:
df.as[Record].rdd.map(_.content_processed)
Run Code Online (Sandbox Code Playgroud)
要么
df.select($"content_processed").as[Seq[(String, String, String)]]
Run Code Online (Sandbox Code Playgroud)
虽然这个时刻似乎有些小问题.
第一种方法(Row.getAs
)和第二种方法()有重要区别Dataset.as
.前者提取对象Any
并适用asInstanceOf
.后者使用编码器在内部类型和所需表示之间进行转换.