在 PySpark 中获取序列文件格式的文件的 HDFS 文件路径

Arn*_*shn 2 sequencefile apache-spark pyspark

我在 HDFS 上的数据是序列文件格式。我正在使用 PySpark(Spark 1.6)并试图实现两件事:

  1. 数据路径包含 yyyy/mm/dd/hh 格式的时间戳,我想将其引入数据本身。我试过 SparkContext.wholeTextFiles 但我认为它可能不支持 Sequence 文件格式。

  2. 如果我想处理一天的数据并想将日期带入数据中,我该如何处理上述问题?在这种情况下,我将加载 yyyy/mm/dd/* 格式的数据。

感谢任何指针。

zer*_*323 5

如果存储类型与 SQL 类型兼容,并且您使用 Spark 2.0,那就很简单了。进口input_file_name

from pyspark.sql.functions import input_file_name 
Run Code Online (Sandbox Code Playgroud)

读取文件并转换为DataFrame

df = sc.sequenceFile("/tmp/foo/").toDF()
Run Code Online (Sandbox Code Playgroud)

添加文件名:

df.withColumn("input", input_file_name())
Run Code Online (Sandbox Code Playgroud)

如果此解决方案不适用于您的情况,那么通用方法是直接列出文件(对于 HDFS,您可以使用hdfs3库):

files = ...
Run Code Online (Sandbox Code Playgroud)

一一读取加上文件名:

def read(f):
    """Just to avoid problems with late binding"""
    return sc.sequenceFile(f).map(lambda x: (f, x))

rdds = [read(f) for f in files]
Run Code Online (Sandbox Code Playgroud)

和工会:

sc.union(rdds)
Run Code Online (Sandbox Code Playgroud)