Arn*_*shn 2 sequencefile apache-spark pyspark
我在 HDFS 上的数据是序列文件格式。我正在使用 PySpark(Spark 1.6)并试图实现两件事:
数据路径包含 yyyy/mm/dd/hh 格式的时间戳,我想将其引入数据本身。我试过 SparkContext.wholeTextFiles 但我认为它可能不支持 Sequence 文件格式。
如果我想处理一天的数据并想将日期带入数据中,我该如何处理上述问题?在这种情况下,我将加载 yyyy/mm/dd/* 格式的数据。
感谢任何指针。
如果存储类型与 SQL 类型兼容,并且您使用 Spark 2.0,那就很简单了。进口input_file_name:
from pyspark.sql.functions import input_file_name
Run Code Online (Sandbox Code Playgroud)
读取文件并转换为DataFrame:
df = sc.sequenceFile("/tmp/foo/").toDF()
Run Code Online (Sandbox Code Playgroud)
添加文件名:
df.withColumn("input", input_file_name())
Run Code Online (Sandbox Code Playgroud)
如果此解决方案不适用于您的情况,那么通用方法是直接列出文件(对于 HDFS,您可以使用hdfs3库):
files = ...
Run Code Online (Sandbox Code Playgroud)
一一读取加上文件名:
def read(f):
"""Just to avoid problems with late binding"""
return sc.sequenceFile(f).map(lambda x: (f, x))
rdds = [read(f) for f in files]
Run Code Online (Sandbox Code Playgroud)
和工会:
sc.union(rdds)
Run Code Online (Sandbox Code Playgroud)