如何向 Spark 中的现有分区添加行?

Avi*_*ava 3 amazon-s3 apache-spark pyspark

我必须更新历史数据。我所说的更新是指向 S3 上的现有分区添加新行,有时甚至添加新列。

当前分区是按日期实现的:created_year={}/created_month={}/created_day={}。为了避免每个分区有太多对象,我执行以下操作来维护单个对象/分区:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions
Run Code Online (Sandbox Code Playgroud)

存在一种情况,我必须添加具有这些列值的某些行:

created_year | created_month | created_day
2019         |10             |27   
Run Code Online (Sandbox Code Playgroud)

这意味着此路径中的文件(S3 对象)created_year=2019/created_month=10/created_day=27/some_random_name.parquet将附加新行。

如果架构发生更改,则所有对象都必须实现该更改。

我尝试研究它通常是如何工作的,所以,有两种感兴趣的模式:覆盖、追加。

第一个将仅添加当前数据并删除其余数据。我不希望出现这种情况。第二个将追加,但最终可能会创建更多对象。我也不希望出现这种情况。我还了解到 Spark 中的数据帧是不可变的。

那么,如何实现在新数据到达现有分区时将其追加并每天维护一个对象呢?

小智 5

根据您的问题,我了解您需要向现有数据添加新行,同时不增加镶木地板文件的数量。这可以通过对特定分区文件夹进行操作来实现。执行此操作时可能会出现三种情况。

1)新建分区

这意味着传入数据在分区列中具有新值。就您而言,这可能是这样的:

现有数据

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  1  |
Run Code Online (Sandbox Code Playgroud)

新数据

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  2  |
Run Code Online (Sandbox Code Playgroud)

因此,在这种情况下,您只需为传入数据创建一个新的分区文件夹并按原样保存即可。

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  1  |
Run Code Online (Sandbox Code Playgroud)

2)现有分区,新数据

这是您想要将新行追加到现有数据的位置。它可能是这样的:

现有数据

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |
Run Code Online (Sandbox Code Playgroud)

新数据

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  b  |   1   |
Run Code Online (Sandbox Code Playgroud)

这里我们有同一分区的新记录。您可以使用“附加模式”,但您希望每个分区文件夹中都有一个 parquet 文件。这就是为什么您应该首先读取现有分区,将其与新数据合并,然后将其写回。

| year | month | day |
| ---- | ----- | --- |
| 2020 |   1   |  2  |
Run Code Online (Sandbox Code Playgroud)

3)现有分区、现有数据

如果传入的数据是UPDATE而不是INSERT怎么办?在这种情况下,您应该更新一行而不是插入新行。想象一下:

现有数据

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |
Run Code Online (Sandbox Code Playgroud)

新数据

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   2   |
Run Code Online (Sandbox Code Playgroud)

“a”之前的值为 1,现在我们希望它为 2。因此,在这种情况下,您应该读取现有数据并更新现有记录。这可以通过如下方式实现。

partition_path = "/path/to/data/year=2020/month=1/day=2"
new_data.repartition(1, "year", "month", "day").write.parquet(partition_path)
Run Code Online (Sandbox Code Playgroud)

当我们将旧数据与新数据进行外连接时,可能会发生四件事,

  • 两个数据具有相同的值,取哪一个并不重要
  • 两个数据有不同的值,取新值
  • 旧数据没有价值,新数据有价值,取新数据
  • 新数据没有价值,旧数据有价值,取旧数据

为了实现我们在这里的愿望,coalesce我们pyspark.sql.functions会做工作。

请注意,此解决方案也涵盖了第二种情况。

关于架构变更

Spark 支持 parquet 文件格式的架构合并。这意味着您可以在数据中添加或删除列。当您添加或删除列时,您会发现从顶层读取数据时某些列不存在。这是因为 Spark 默认禁用模式合并。从文档中:

与 Protocol Buffer、Avro 和 Thrift 一样,Parquet 也支持模式演化。用户可以从简单的架构开始,然后根据需要逐渐向架构添加更多列。这样,用户最终可能会得到多个具有不同但相互兼容架构的 Parquet 文件。Parquet 数据源现在能够自动检测这种情况并合并所有这些文件的架构。

为了能够读取所有列,您需要将该mergeSchema选项设置为true

| year | month | day | key | value |
| ---- | ----- | --- | --- | ----- |
| 2020 |   1   |  1  |  a  |   1   |
Run Code Online (Sandbox Code Playgroud)