soa*_*gem 5 scala apache-spark parquet
我使用 Spark 将多个 parquet 文件读取到单个 RDD 中,并使用标准通配符路径约定。换句话说,我正在做这样的事情:
val myRdd = spark.read.parquet("s3://my-bucket/my-folder/**/*.parquet")
Run Code Online (Sandbox Code Playgroud)
然而,有时这些 Parquet 文件会有不同的架构。当我对 RDD 进行转换时,我可以通过查找某些列是否存在(或不存在)来尝试在映射函数中区分它们。然而,要知道 RDD 中给定行使用哪个模式(以及我在这里具体询问的方式),一个可靠的方法是知道我正在查看哪个文件路径。
在 RDD 级别上,有什么方法可以判断当前行来自哪个特定的 parquet 文件吗?所以想象一下我的代码目前看起来像这样(这是一个简化的示例):
val mapFunction = new MapFunction[Row, (String, Row)] {
override def call(row: Row): (String, Row) = myJob.transform(row)
}
val pairRdd = myRdd.map(mapFunction, encoder=kryo[(String, Row)]
Run Code Online (Sandbox Code Playgroud)
在myJob.transform( )代码中,我用其他值装饰结果,将其转换为一对 RDD,并执行一些其他转换。
我使用该row.getAs( ... )方法来查找特定的列值,这是一个非常有用的方法。我想知道是否有任何类似的方法(例如row.getInputFile( )或类似的方法)来获取我当前正在操作的特定文件的名称?
由于我传递通配符以将多个镶木地板文件读取到单个 RDD 中,因此我不知道我正在操作哪个文件。如果没有别的事,我希望有一种方法可以用输入文件名来装饰 RDD 行。这可能吗?
小智 11
您可以为文件名添加新列,如下所示
import org.apache.spark.sql.functions._
val myDF = spark.read.parquet("s3://my-bucket/my-folder/**/*.parquet").withColumn("inputFile", input_file_name())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4056 次 |
| 最近记录: |