在Apache Spark 1.3中向Data Frame附加一列

Ole*_*ikh 51 scala dataframe apache-spark

是否有可能以及将列添加到数据框的最有效的方法是什么?

更具体地,列可以用作现有数据帧的行ID.

在简化的情况下,从文件读取而不是标记它,我可以想到如下(在Scala中),但它完成了错误(在第3行),并且无论如何看起来不是最好的路线:

var dataDF = sc.textFile("path/file").toDF() 
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") 
dataDF = dataDF.withColumn("ID", rowDF("ID")) 
Run Code Online (Sandbox Code Playgroud)

Ole*_*ikh 51

我发布这个问题已经有一段时间了,似乎其他一些人也希望得到答案.以下是我发现的.

因此,最初的任务是将具有行标识符(基本上是序列1 to numRows)的列附加到任何给定的数据帧,因此可以跟踪行的顺序/存在(例如,当您进行采样时).这可以通过以下方式实现:

sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
Run Code Online (Sandbox Code Playgroud)

关于将任何列附加到任何数据框的一般情况:

Spark API中withColumn与此功能"最接近"的是和withColumnRenamed.根据Scala文档,前者通过添加列返回新的DataFrame.在我看来,这是一个有点混乱和不完整的定义.这两个函数都只能在this数据帧上运行,即给定两个数据帧df1df2col:

val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
Run Code Online (Sandbox Code Playgroud)

因此,除非您能够将现有数据框中的列转换为所需的形状,否则不能使用withColumnwithColumnRenamed附加任意列(独立列或其他数据框).

正如上面评论的那样,解决方法可能是使用join- 这可能会非常混乱,尽管可能 - 将上面的唯一键附加zipWithIndex到数据框或列可能会起作用.虽然效率是......

很明显,在数据框中附加一个列并不是分布式环境的简单功能,可能根本就没有非常有效,简洁的方法.但我认为,即使有性能警告,提供此核心功能仍然非常重要.

  • 好答案!如果Spark知道我正在加入*sorted*键,则可以优化追加列功能.这将是一个巨大的性能提升. (2认同)
  • 如果你想要附加一个id列,你应该看看函数monotonically_increasing_id(),它可以在withColumn中使用. (2认同)

Tal*_*ffe 30

不确定它是否在spark 1.3中工作但在spark 1.5中我使用withColumn:

import sqlContext.implicits._
import org.apache.spark.sql.functions._


df.withColumn("newName",lit("newValue"))
Run Code Online (Sandbox Code Playgroud)

当我需要使用与数据帧的现有列无关的值时,我使用它

这类似于@ NehaM的答案,但更简单


Neh*_*haM 6

我从上面的答案中得到了帮助.但是,如果我们想要更改a DataFrame并且当前的API稍有不同,我发现它不完整Spark 1.6. zipWithIndex()返回Tuple(Row, Long)包含每一行和对应的索引.我们可以Row根据自己的需要使用它来创造新的.

val rdd = df.rdd.zipWithIndex()
             .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show
Run Code Online (Sandbox Code Playgroud)

我希望这会有所帮助.