使用 Amazon Glue 将一行转换为多行

Dwa*_*ill 2 bigdata apache-spark pyspark aws-glue

我正在尝试使用 Amazon Glue 将一行变成多行。我的目标是类似于 SQL UNPIVOT。

我有一个 360GB 的管道分隔文本文件,压缩 (gzip)。它有超过 1,620 列。这是基本布局:

primary_key|property1_name|property1_value|property800_name|property800_value
12345|is_male|1|is_college_educated|1
Run Code Online (Sandbox Code Playgroud)

这些属性名称/值字段有 800 多个。大约有 2.8 亿行。该文件位于 S3 存储桶中。我需要将数据导入 Redshift,但 Redshift 中的列限制为 1,600。

用户希望我取消数据透视。例如:

primary_key|key|value
12345|is_male|1
12345|is_college_educated|1
Run Code Online (Sandbox Code Playgroud)

我相信我可以为此使用 Amazon Glue。但是,这是我第一次使用 Glue。我正在努力想出一个好方法来做到这一点。一些 pySpark 扩展转换看起来很有希望(也许,“映射”或“关系化”)。请参阅http://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-etl-scripts-pyspark-transforms.html。所以,我的问题是:在 Glue 中这样做的好方法是什么?

谢谢。

Ant*_*lov 6

AWS Glue 没有合适的内置GlueTransform子类来将单个转换DynamicRecord为多个(通常 MapReduce 映射器可以做到)。您自己也无法创建这样的转换。

但是有两种方法可以解决您的问题。

选项 1:使用 Spark RDD API

让我们尝试完全执行您需要的操作:将单个记录映射到多个记录。由于GlueTransform限制,我们将不得不深入研究并使用 Spark RDD API。

RDD 有特殊的flatMap方法可以产生多个Row然后被展平的。您示例的代码如下所示:

source_data = somehow_get_the_data_into_glue_dynamic_frame()
source_data_rdd = source_data.toDF().rdd
unpivoted_data_rdd = source_data_rdd.flatMap(
    lambda row: (
        (
            row.id,
            getattr(row, f'{field}_name'),
            getattr(row, f'{field}_value'),
        )
        for field in properties_names
    ),
)
unpivoted_data = glue_ctx.create_dynamic_frame \
    .from_rdd(unpivoted_data_rdd, name='unpivoted')
Run Code Online (Sandbox Code Playgroud)

选项 2:映射 + 关系化 + 连接

如果您只想使用 AWS Glue ETL API 执行请求的操作,那么以下是我的说明:

  1. 首先每个DynamicRecord映射到主键和对象列表:
mapped = Map.apply(
    source_data,
    lambda record:  # here we operate on DynamicRecords not RDD Rows
        DynamicRecord(
            primary_key=record.primary_key,
            fields=[
                dict(
                    key=getattr(row, f'{field}_name'),
                    value=getattr(row, f'{field}_value'),
                )
                for field in properties_names
            ],
        )
)
Run Code Online (Sandbox Code Playgroud)

示例输入:

primary_key|property1_name|property1_value|property800_name|property800_value
      12345|is_male       |              1|is_new          |                1
      67890|is_male       |              0|is_new          |                0
Run Code Online (Sandbox Code Playgroud)

输出:

primary_key|fields
      12345|[{'key': 'is_male', 'value': 1}, {'key': 'is_new', 'value': 1}]
      67890|[{'key': 'is_male', 'value': 0}, {'key': 'is_new', 'value': 0}]
Run Code Online (Sandbox Code Playgroud)
  1. 接下来将它关联起来:每个列表将被转换为多行,每个嵌套对象都将被取消嵌套(Scala Glue ETL API 文档比 Python 文档有很好的示例和更详细的解释)。
relationalized_dfc = Relationalize.apply(
    mapped,
    staging_path='s3://tmp-bucket/tmp-dir/',  # choose any dir for temp files
)
Run Code Online (Sandbox Code Playgroud)

该方法返回DynamicFrameCollection。在单个数组字段的情况下,它将包含两个DynamicFrame:首先primary_key是扁平化和非嵌套fields动态框架的外键和外键。输出:

# table name: roottable
primary_key|fields
      12345|     1
      67890|     2
Run Code Online (Sandbox Code Playgroud)
# table name: roottable.fields
id|index|val.key|val.value
 1|    0|is_male|        1
 1|    1|is_new |        1
 2|    0|is_male|        0
 2|    1|is_new |        0
Run Code Online (Sandbox Code Playgroud)
  1. 最后一个合乎逻辑的步骤是加入这两个DynamicFrame
primary_key|property1_name|property1_value|property800_name|property800_value
      12345|is_male       |              1|is_new          |                1
      67890|is_male       |              0|is_new          |                0
Run Code Online (Sandbox Code Playgroud)

输出:

primary_key|fields|id|index|val.key|val.value
      12345|     1| 1|    0|is_male|        1
      12345|     1| 1|    1|is_new |        1
      67890|     2| 2|    0|is_male|        0
      67890|     2| 2|    1|is_new |        0
Run Code Online (Sandbox Code Playgroud)

现在您只需重命名选择所需的字段。