从spark DataFrame中提取`Seq [(String,String,String)]`

Mat*_*yra 12 scala dataframe apache-spark apache-spark-sql

我有一排火花DF Seq[(String, String, String)].我正试图做某种flatMap事情,但我做的任何尝试最终都会抛出

java.lang.ClassCastException:org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema无法强制转换为scala.Tuple3

我可以从DF中获取单行或多行就好了

df.map{ r => r.getSeq[Feature](1)}.first
Run Code Online (Sandbox Code Playgroud)

回报

Seq[(String, String, String)] = WrappedArray([ancient,jj,o], [olympia_greece,nn,location] .....
Run Code Online (Sandbox Code Playgroud)

并且RDD的数据类型似乎是正确的.

org.apache.spark.rdd.RDD[Seq[(String, String, String)]]

df的架构是

root
 |-- article_id: long (nullable = true)
 |-- content_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- lemma: string (nullable = true)
 |    |    |-- pos_tag: string (nullable = true)
 |    |    |-- ne_tag: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我知道这个问题与处理RDD行的spark sql有关,org.apache.spark.sql.Row即使他们愚蠢地说它是一个Seq[(String, String, String)].有一个相关的问题(链接如下),但这个问题的答案对我不起作用.我也不熟悉火花来弄清楚如何将它变成一个有效的解决方案.

Row[Seq[(String, String, String)]]或者Row[(String, String, String)]或者Seq[Row[(String, String, String)]]甚至是更疯狂的行.

我正在尝试做类似的事情

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1)
Run Code Online (Sandbox Code Playgroud)

这似乎有效,但实际上并没有

df.map{ r => r.getSeq[Feature](1)}.map(_(1)._1).first
Run Code Online (Sandbox Code Playgroud)

抛出上述错误.那么我应该如何(例如)在每一行获得第二个元组的第一个元素?

另外为什么火花被设计为这样做,声称事物是一种类型似乎是愚蠢的,而实际上它不是也不能转换为声称的类型.


相关问题:GenericRowWithSchema异常将DataBuffer中的HashSet转换为来自Hive表的RDD中的HashSet

相关错误报告:http://search-hadoop.com/m/q3RTt2bvwy19Dxuq1&subj=ClassCastException+when+extracting+and+collecting+DF+array+column+type

zer*_*323 17

嗯,它并不声称它是一个元组.它声称它是一个struct映射到Row:

import org.apache.spark.sql.Row

case class Feature(lemma: String, pos_tag: String, ne_tag: String)
case class Record(id: Long, content_processed: Seq[Feature])

val df = Seq(
  Record(1L, Seq(
    Feature("ancient", "jj", "o"),
    Feature("olympia_greece", "nn", "location")
  ))
).toDF

val content = df.select($"content_processed").rdd.map(_.getSeq[Row](0))
Run Code Online (Sandbox Code Playgroud)

您将在Spark SQL编程指南中找到确切的映射规则.

由于Row结构不完全漂亮,您可能希望将其映射到有用的东西:

content.map(_.map {
  case Row(lemma: String, pos_tag: String, ne_tag: String) => 
    (lemma, pos_tag, ne_tag)
})
Run Code Online (Sandbox Code Playgroud)

要么:

content.map(_.map ( row => (
  row.getAs[String]("lemma"),
  row.getAs[String]("pos_tag"),
  row.getAs[String]("ne_tag")
)))
Run Code Online (Sandbox Code Playgroud)

最后一个稍微简洁的方法Datasets:

df.as[Record].rdd.map(_.content_processed)
Run Code Online (Sandbox Code Playgroud)

要么

df.select($"content_processed").as[Seq[(String, String, String)]]
Run Code Online (Sandbox Code Playgroud)

虽然这个时刻似乎有些小问题.

第一种方法(Row.getAs)和第二种方法()有重要区别Dataset.as.前者提取对象Any并适用asInstanceOf.后者使用编码器在内部类型和所需表示之间进行转换.