Spark加载数据并将文件名添加为dataframe列

yee*_*379 24 apache-spark apache-spark-sql pyspark

我正在使用包装函数将一些数据加载到Spark中:

def load_data( filename ):
    df = sqlContext.read.format("com.databricks.spark.csv")\
        .option("delimiter", "\t")\
        .option("header", "false")\
        .option("mode", "DROPMALFORMED")\
        .load(filename)
    # add the filename base as hostname
    ( hostname, _ ) = os.path.splitext( os.path.basename(filename) )
    ( hostname, _ ) = os.path.splitext( hostname )
    df = df.withColumn('hostname', lit(hostname))
    return df
Run Code Online (Sandbox Code Playgroud)

具体来说,我使用glob来一次加载一堆文件:

df = load_data( '/scratch/*.txt.gz' )
Run Code Online (Sandbox Code Playgroud)

文件是:

/scratch/host1.txt.gz
/scratch/host2.txt.gz
...
Run Code Online (Sandbox Code Playgroud)

我想列"主机名"实际上包含文件的真实名称被加载,而不是水珠(即host1,host2等等,而不是*).

我怎样才能做到这一点?

use*_*411 43

你可以使用input_file_name哪个:

为当前Spark任务的文件名创建字符串列.

from  pyspark.sql.functions import input_file_name

df.withColumn("filename", input_file_name())
Run Code Online (Sandbox Code Playgroud)

  • 也适用于Scala。导入org.apache.spark.sql.functions._ df.withColumn(“ filename”,input_file_name)` (3认同)
  • 如果加载 csv 文件,请勿使用 `spark.read.csv("<data_path>")`,而使用 `spark.read.format("csv").load("<data_path>")` (3认同)
  • 仅供记录:此方法位于[pyspark.sql.functions](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions. input_file_name)包. (2认同)