Ole*_*ikh 51 scala dataframe apache-spark
是否有可能以及将列添加到数据框的最有效的方法是什么?
更具体地,列可以用作现有数据帧的行ID.
在简化的情况下,从文件读取而不是标记它,我可以想到如下(在Scala中),但它完成了错误(在第3行),并且无论如何看起来不是最好的路线:
var dataDF = sc.textFile("path/file").toDF()
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID")
dataDF = dataDF.withColumn("ID", rowDF("ID"))
Run Code Online (Sandbox Code Playgroud)
Ole*_*ikh 51
我发布这个问题已经有一段时间了,似乎其他一些人也希望得到答案.以下是我发现的.
因此,最初的任务是将具有行标识符(基本上是序列1 to numRows
)的列附加到任何给定的数据帧,因此可以跟踪行的顺序/存在(例如,当您进行采样时).这可以通过以下方式实现:
sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
Run Code Online (Sandbox Code Playgroud)
关于将任何列附加到任何数据框的一般情况:
Spark API中withColumn
与此功能"最接近"的是和withColumnRenamed
.根据Scala文档,前者通过添加列返回新的DataFrame.在我看来,这是一个有点混乱和不完整的定义.这两个函数都只能在this
数据帧上运行,即给定两个数据帧df1
和df2
列col
:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
Run Code Online (Sandbox Code Playgroud)
因此,除非您能够将现有数据框中的列转换为所需的形状,否则不能使用withColumn
或withColumnRenamed
附加任意列(独立列或其他数据框).
正如上面评论的那样,解决方法可能是使用join
- 这可能会非常混乱,尽管可能 - 将上面的唯一键附加zipWithIndex
到数据框或列可能会起作用.虽然效率是......
很明显,在数据框中附加一个列并不是分布式环境的简单功能,可能根本就没有非常有效,简洁的方法.但我认为,即使有性能警告,提供此核心功能仍然非常重要.
Tal*_*ffe 30
不确定它是否在spark 1.3中工作但在spark 1.5中我使用withColumn:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
df.withColumn("newName",lit("newValue"))
Run Code Online (Sandbox Code Playgroud)
当我需要使用与数据帧的现有列无关的值时,我使用它
这类似于@ NehaM的答案,但更简单
我从上面的答案中得到了帮助.但是,如果我们想要更改a DataFrame
并且当前的API稍有不同,我发现它不完整Spark 1.6
.
zipWithIndex()
返回Tuple
的(Row, Long)
包含每一行和对应的索引.我们可以Row
根据自己的需要使用它来创造新的.
val rdd = df.rdd.zipWithIndex()
.map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show
Run Code Online (Sandbox Code Playgroud)
我希望这会有所帮助.
归档时间: |
|
查看次数: |
80763 次 |
最近记录: |