使用 pyspark 按日期元素读取 json 文件并对其进行分组

Mal*_*ath 4 python apache-spark pyspark airflow databricks

我的 S3 存储桶上有多个 JSON 文件(10 TB ~),我需要按每个 json 文档中存在的日期元素来组织这些文件。

我认为我的代码需要做什么

  • 读取s3存储桶中的所有json文件。
  • 保留 2022-01-01 和 2022-04-01 之间包含元素“creation_date”的所有文档
  • 将它们以镶木地板格式保存在另一个存储桶中。

考虑到我正在处理的规模,我不确定这样做是否正确。

这是 json 文档的示例。每个文件都有多个这样的文档。

  {
    "id": 123456,
    "creation_date": "2022-01-01T23:35:16",
    "params": {
      "doc_info": "AXBD",
      "return_date": "20/05/2021",
      "user_name": "XXXXXXXX",
      "value": "40,00"
    },
    "user_id": "1234567",
    "type": "TEST"
  }
]
Run Code Online (Sandbox Code Playgroud)

这是我已经在DB笔记本上尝试过的,但实际上,我不能直接在笔记本上使用代码。我必须编写 Spark 代码并在气流 dag 上运行,因为我没有直接从笔记本使用存储桶的写入权限。

# Trying to read all the json files
df_test = spark.read.json("s3://my-bucket/**/**" + "/*.json")

# Filtering all documents that has the creation_date period that I want
df_test_filter = df_test.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))

# Write parquet on another bucket
# In this test, I'm saving on a local bucket that I have write access.
df_test_filter.write.mode('overwrite').parquet("s3://my-local-test-bucket/")
Run Code Online (Sandbox Code Playgroud)

这似乎在我用来测试的单个 json 文件上运行良好,但我的问题是:

  • 如果没有 databricks 笔记本并使用带有 pyspark 的气流 dag,我该如何做到这一点?
  • 在思考性能问题时,有更好的方法吗?

vil*_*oro 5

您只想运行该作业一次还是定期运行?

一跑

你所拥有的应该可以很好地发挥作用

# Trying to read all the json files
sdf = spark.read.json("s3://my-bucket/**/**/*.json")
Run Code Online (Sandbox Code Playgroud)

我唯一要添加的是按日期对输出进行分区以加快查询速度:

(
    # Filtering all documents that has the creation_date period that I want
    sdf.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
    # Partition by creation date so that's easier to query
    .partitionBy("creation_date")
    # Export the data
    .write.mode('append')
    .parquet("s3://my-local-test-bucket/")
)
Run Code Online (Sandbox Code Playgroud)

定期运行

这里我想知道文件结构是什么。按某些日期对数据进行分区是一个好主意,在这种情况下,您可能会将输入数据按另一个日期进行分区(也许insert_date?)。

假设是这种情况,我建议您每天读取该数据,然后将其写入按所需日期分区的实木复合地板中。

这将通过以下方式完成:

# Trying to read all the json files
sdf = spark.read.json(f"s3://my-bucket/insert_date={today:%Y-%m-%d}/*/")

sdf.partitionBy("creation_date").write.mode('append').parquet("s3://my-local-test-bucket/")
Run Code Online (Sandbox Code Playgroud)

稍后您可以简单地检索您需要的数据:

sdf = (
    spark.read.json(f"s3://my-bucket/")
    .where(F.col("creation_date").between('2022-01-01','2022-04-01'))
)
Run Code Online (Sandbox Code Playgroud)