Boe*_*ern 3 csv scala apache-spark
为了能够在DataFrame
不转义的情况下使用.
我的列名,我需要一个函数来“验证”所有列名 - 但我尝试过的所有方法都没有及时完成这项工作(我在 5 分钟后中止)。
我正在尝试我的算法的数据集是 golub 数据集(在此处获取)。这是一个 2.2MB 的 CSV 文件,有 7200 列。重命名所有列应该是几秒钟的事情
读取CSV的代码
var dfGolub = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("golub_merged.csv")
.drop("_c0") // drop the first column
.repartition(numOfCores)
Run Code Online (Sandbox Code Playgroud)
尝试重命名列:
def validifyColumnnames1(df : DataFrame) : DataFrame = {
import org.apache.spark.sql.functions.col
val cols = df.columns
val colsRenamed = cols.map(name => col(name).as(name.replaceAll("\\.","")))
df.select(colsRenamed : _*)
}
def validifyColumnnames2[T](df : Dataset[T]) : DataFrame = {
val newColumnNames = ArrayBuffer[String]()
for(oldCol <- df.columns) {
newColumnNames += oldCol.replaceAll("\\.","")
}
df.toDF(newColumnNames : _*)
}
def validifyColumnnames3(df : DataFrame) : DataFrame = {
var newDf = df
for(col <- df.columns){
newDf = newDf.withColumnRenamed(col,col.replaceAll("\\.",""))
}
newDf
}
Run Code Online (Sandbox Code Playgroud)
任何想法是什么导致了这个性能问题?
设置:我在 Ubuntu 16.04local[24]
上以 16 核 * 2 个线程和 96GB RAM 的模式运行 Spark 2.1.0
假设您知道类型,您可以简单地创建架构而不是推断它(推断架构会降低性能,甚至对于 csv 可能是错误的)。
为简单起见,让我们假设您有文件 example.csv,如下所示:
A.B, A.C, A.D
a,3,1
Run Code Online (Sandbox Code Playgroud)
你可以这样做:
val scehma = StructType(Seq(StructField("A_B",StringType),StructField("A_C", IntegerType), StructField("AD", IntegerType)))
val df = spark.read.option("header","true").schema(scehma).csv("example.csv")
df.show()
+---+---+---+
|A_B|A_C| AD|
+---+---+---+
| a| 3| 1|
+---+---+---+
Run Code Online (Sandbox Code Playgroud)
如果您事先不知道信息,您可以像以前一样使用推断架构,然后您可以使用数据框生成架构:
val fields = for {
x <- df.schema
} yield StructField(x.name.replaceAll("\\.",""), x.dataType, x.nullable)
val schema = StructType(fields)
Run Code Online (Sandbox Code Playgroud)
并像以前一样使用该模式重新读取数据帧
归档时间: |
|
查看次数: |
824 次 |
最近记录: |