Mal*_*ath 4 python apache-spark pyspark airflow databricks
我的 S3 存储桶上有多个 JSON 文件(10 TB ~),我需要按每个 json 文档中存在的日期元素来组织这些文件。
我认为我的代码需要做什么
考虑到我正在处理的规模,我不确定这样做是否正确。
这是 json 文档的示例。每个文件都有多个这样的文档。
{
"id": 123456,
"creation_date": "2022-01-01T23:35:16",
"params": {
"doc_info": "AXBD",
"return_date": "20/05/2021",
"user_name": "XXXXXXXX",
"value": "40,00"
},
"user_id": "1234567",
"type": "TEST"
}
]
Run Code Online (Sandbox Code Playgroud)
这是我已经在DB笔记本上尝试过的,但实际上,我不能直接在笔记本上使用代码。我必须编写 Spark 代码并在气流 dag 上运行,因为我没有直接从笔记本使用存储桶的写入权限。
# Trying to read all the json files
df_test = spark.read.json("s3://my-bucket/**/**" + "/*.json")
# Filtering all documents that has the creation_date period that I want
df_test_filter = df_test.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
# Write parquet on another bucket
# In this test, I'm saving on a local bucket that I have write access.
df_test_filter.write.mode('overwrite').parquet("s3://my-local-test-bucket/")
Run Code Online (Sandbox Code Playgroud)
这似乎在我用来测试的单个 json 文件上运行良好,但我的问题是:
您只想运行该作业一次还是定期运行?
你所拥有的应该可以很好地发挥作用
# Trying to read all the json files
sdf = spark.read.json("s3://my-bucket/**/**/*.json")
Run Code Online (Sandbox Code Playgroud)
我唯一要添加的是按日期对输出进行分区以加快查询速度:
(
# Filtering all documents that has the creation_date period that I want
sdf.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
# Partition by creation date so that's easier to query
.partitionBy("creation_date")
# Export the data
.write.mode('append')
.parquet("s3://my-local-test-bucket/")
)
Run Code Online (Sandbox Code Playgroud)
这里我想知道文件结构是什么。按某些日期对数据进行分区是一个好主意,在这种情况下,您可能会将输入数据按另一个日期进行分区(也许insert_date?)。
假设是这种情况,我建议您每天读取该数据,然后将其写入按所需日期分区的实木复合地板中。
这将通过以下方式完成:
# Trying to read all the json files
sdf = spark.read.json(f"s3://my-bucket/insert_date={today:%Y-%m-%d}/*/")
sdf.partitionBy("creation_date").write.mode('append').parquet("s3://my-local-test-bucket/")
Run Code Online (Sandbox Code Playgroud)
稍后您可以简单地检索您需要的数据:
sdf = (
spark.read.json(f"s3://my-bucket/")
.where(F.col("creation_date").between('2022-01-01','2022-04-01'))
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
894 次 |
| 最近记录: |