WIT*_*WIT 3 python apache-spark apache-spark-sql pyspark databricks
我在 S3 中有很多行分隔的 json 文件,想要读取 Spark 中的所有这些文件,然后读取 json 中的每一行,并为该行输出一个 Dict/Row,并将文件名作为一列。我将如何在 python 中以有效的方式做到这一点?每个 json 大约为 200 MB。
下面是一个文件示例(像这样有 200,000 行),将此文件命名为 class_scores_0219:
{"name": "Maria C", "class":"Math", "score":"80", "student_identification":22}
{"name": "Maria F", "class":"Physics", "score":"90", "student_identification":12}
{"name": "Fink", "class":"English", "score":"75", "student_identification":7}
Run Code Online (Sandbox Code Playgroud)
输出 DataFrame 为(为简单起见,仅显示一行):
+-------------------+---------+-------+-------+------------------------+
| file_name | name | class | score | student_identification |
+-------------------+---------+-------+-------+------------------------+
| class_scores_0219 | Maria C | Math | 80 | 22 |
+-------------------+---------+-------+-------+------------------------+
Run Code Online (Sandbox Code Playgroud)
我已经使用以下方法设置了 s3 密钥/访问密钥:(sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", SECRET_KEY)
访问密钥也是如此),但可以根据需要以不同的方式进行连接。
我愿意接受任何最有效的选项,我可以提供文件列表并将其输入,或者我可以连接到 boto3 并提供前缀。我是 Spark 新手,因此感谢所有帮助。
您可以通过使用 Spark 本身来实现这一点。
只需添加一个带有 input_file_names 的新列,您就会得到所需的结果
from pyspark.sql.functions import input_file_name
df = spark.read.json(path_to_you_folder_conatining_multiple_files)
df = df.withColumn('fileName',input_file_name())
Run Code Online (Sandbox Code Playgroud)
如果您想读取多个文件,可以将它们作为文件列表传递
files = [file1, file2, file3]
df = spark.read.json(*files)
Run Code Online (Sandbox Code Playgroud)
或者,如果您的文件列表与通配符匹配,那么您可以像下面这样使用它
df = spark.read.json('path/to/file/load2020*.json')
Run Code Online (Sandbox Code Playgroud)
或者您可以使用 boto3 列出文件夹中的所有对象,然后创建所需文件的列表并将其传递给 df。
希望能帮助到你。