迭代Spark数据帧中的行和列

ps0*_*604 17 scala apache-spark apache-spark-sql spark-dataframe

我有以下动态创建的Spark数据帧:

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
Run Code Online (Sandbox Code Playgroud)

现在,我需要迭代每一行和sqlDF每列打印每一列,这是我的尝试:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}
Run Code Online (Sandbox Code Playgroud)

row是类型Row,但不可迭代,这就是为什么此代码抛出编译错误row.foreach.如何迭代每列Row

Sar*_*avu 14

考虑你有Dataframe以下情况

+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy|   aaa| 20|
|Berta|   bbb| 30|
|  Joe|   ccc| 40|
+-----+------+---+
Run Code Online (Sandbox Code Playgroud)

要循环数据帧和提取的元素数据框,foreach将没有直接的帮助.要实现它,您可以选择以下方法之一.

方法1 - 使用rdd循环

使用case class您的顶部数据帧.该rdd.collect变量将包含的各行数据帧row行类型.要从行中获取每个元素,请使用rdd其中包含逗号分隔值中每行的值.使用row.mkString(",")函数(内置函数),您可以split使用索引访问行的每个列值.

import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))
Run Code Online (Sandbox Code Playgroud)

方法2 - 使用where并选择

您可以直接使用rdd,,并在内部循环并查找数据.由于它不应该抛出索引超出绑定的异常,因此使用if条件

for (row <- df.rdd.collect)
{   
    var name = row.mkString(",").split(",")(0)
    var sector = row.mkString(",").split(",")(1)
    var age = row.mkString(",").split(",")(2)   
}
Run Code Online (Sandbox Code Playgroud)

方法3 - 使用临时表

您可以将数据帧注册为temptable,它将存储在spark的内存中.然后,您可以像使用其他数据库一样使用select查询来查询数据,然后收集并保存在变量中

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
    name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString
Run Code Online (Sandbox Code Playgroud)


SCo*_*uto 10

您可以转换RowSeq使用toSeq.一旦转向,Seq您可以像往常一样迭代它foreach,map或者您需要的任何东西

    sqlDF.foreach { row => 
           row.toSeq.foreach{col => println(col) }
    }
Run Code Online (Sandbox Code Playgroud)

输出:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
Run Code Online (Sandbox Code Playgroud)

  • 这个答案对我不起作用,但 @Sarath Avanavu 答案中的方法 1 有效。 (2认同)
  • 是的,这是不稳定的,因为将记录转换为有用的东西很容易失败,即使 Spark 对数据帧的标准处理自动管理到甚至案例类的转换。 (2认同)

Rap*_*oth 6

你应该mkString在你的Row

sqlDF.foreach { row =>
  println(row.mkString(",")) 
}
Run Code Online (Sandbox Code Playgroud)

但请注意,这将打印在执行程序 JVM 中,因此通常您不会看到输出(除非您使用 master = local)


Nar*_*shi 6

sqlDF.foreach不适合我,但 @Sarath Avanavu 答案中的方法 1 有效,但有时它也会处理记录的顺序。

我发现了另一种有效的方法

df.collect().foreach { row =>
   println(row.mkString(","))
}
Run Code Online (Sandbox Code Playgroud)