Scala Spark:重命名大量列的性能问题

Boe*_*ern 3 csv scala apache-spark

为了能够在DataFrame不转义的情况下使用.我的列名,我需要一个函数来“验证”所有列名 - 但我尝试过的所有方法都没有及时完成这项工作(我在 5 分钟后中止)。

我正在尝试我的算法的数据集是 golub 数据集(在此处获取)。这是一个 2.2MB 的 CSV 文件,有 7200 列。重命名所有列应该是几秒钟的事情

读取CSV的代码

var dfGolub = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("golub_merged.csv")
    .drop("_c0") // drop the first column
    .repartition(numOfCores)
Run Code Online (Sandbox Code Playgroud)

尝试重命名列:

 def validifyColumnnames1(df : DataFrame) : DataFrame = {
     import org.apache.spark.sql.functions.col
     val cols = df.columns
     val colsRenamed = cols.map(name => col(name).as(name.replaceAll("\\.","")))
     df.select(colsRenamed : _*)
 }


def validifyColumnnames2[T](df : Dataset[T]) : DataFrame = {
    val newColumnNames = ArrayBuffer[String]()
    for(oldCol <- df.columns) {
        newColumnNames +=  oldCol.replaceAll("\\.","")
    }
    df.toDF(newColumnNames : _*)
}

def validifyColumnnames3(df : DataFrame) : DataFrame = {
    var newDf = df
    for(col <- df.columns){
        newDf = newDf.withColumnRenamed(col,col.replaceAll("\\.",""))
    }
    newDf
}
Run Code Online (Sandbox Code Playgroud)

任何想法是什么导致了这个性能问题?

设置:我在 Ubuntu 16.04local[24]上以 16 核 * 2 个线程和 96GB RAM 的模式运行 Spark 2.1.0

Ass*_*son 5

假设您知道类型,您可以简单地创建架构而不是推断它(推断架构会降低性能,甚至对于 csv 可能是错误的)。

为简单起见,让我们假设您有文件 example.csv,如下所示:

A.B, A.C, A.D
a,3,1
Run Code Online (Sandbox Code Playgroud)

你可以这样做:

val scehma = StructType(Seq(StructField("A_B",StringType),StructField("A_C", IntegerType), StructField("AD", IntegerType)))
val df = spark.read.option("header","true").schema(scehma).csv("example.csv")
df.show()

+---+---+---+
|A_B|A_C| AD|
+---+---+---+
|  a|  3|  1|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)

如果您事先不知道信息,您可以像以前一样使用推断架构,然后您可以使用数据框生成架构:

val fields = for {
  x <- df.schema
} yield StructField(x.name.replaceAll("\\.",""), x.dataType, x.nullable)
val schema = StructType(fields)
Run Code Online (Sandbox Code Playgroud)

并像以前一样使用该模式重新读取数据帧