通过AWS上的API在粘合表上添加分区?

Gud*_*dzo 4 amazon-s3 amazon-web-services amazon-athena aws-glue

我有一个S3存储桶,该存储桶不断填充新数据,我正在使用Athena和Glue查询该数据,问题是,如果胶水不知道创建了新分区,那么它不会搜索需要搜索的分区那里。如果我每次需要一个新分区时都要进行一次API调用来运行Glue搜寻器,那么这样做太昂贵了,因此最好的解决方案是告诉胶水添加了一个新分区,即在其属性表中创建一个新分区。我浏览了AWS文档,但没有走运,我将Java与AWS结合使用。有什么帮助吗?

con*_*fun 8

您可能要使用batch_create_partition()胶水api注册新分区。它不需要任何昂贵的操作,例如MSCK REPAIR TABLE或重新爬网。

我有一个类似的用例,为此我编写了一个执行以下操作的python脚本-

步骤1-获取表信息并从中解析注册分区所需的必要信息。

# Fetching table information from glue catalog
logger.info("Fetching table info for {}.{}".format(l_database, l_table))
try:
    response = l_client.get_table(
        CatalogId=l_catalog_id,
        DatabaseName=l_database,
        Name=l_table
    )
except Exception as error:
    logger.error("Exception while fetching table info for {}.{} - {}"
                 .format(l_database, l_table, error))
    sys.exit(-1)

# Parsing table info required to create partitions from table
input_format = response['Table']['StorageDescriptor']['InputFormat']
output_format = response['Table']['StorageDescriptor']['OutputFormat']
table_location = response['Table']['StorageDescriptor']['Location']
serde_info = response['Table']['StorageDescriptor']['SerdeInfo']
partition_keys = response['Table']['PartitionKeys']
Run Code Online (Sandbox Code Playgroud)

第2步-生成列表字典,其中每个列表都包含创建单个分区的信息。所有列表将具有相同的结构,但是其分区特定值将更改(年,月,日,小时)

def generate_partition_input_list(start_date, num_of_days, table_location,
                                  input_format, output_format, serde_info):
    input_list = []  # Initializing empty list
    today = datetime.utcnow().date()
    if start_date > today:  # To handle scenarios if any future partitions are created manually
        start_date = today
    end_date = today + timedelta(days=num_of_days)  # Getting end date till which partitions needs to be created
    logger.info("Partitions to be created from {} to {}".format(start_date, end_date))

    for input_date in date_range(start_date, end_date):
        # Formatting partition values by padding required zeroes and converting into string
        year = str(input_date)[0:4].zfill(4)
        month = str(input_date)[5:7].zfill(2)
        day = str(input_date)[8:10].zfill(2)
        for hour in range(24):  # Looping over 24 hours to generate partition input for 24 hours for a day
            hour = str('{:02d}'.format(hour))  # Padding zero to make sure that hour is in two digits
            part_location = "{}{}/{}/{}/{}/".format(table_location, year, month, day, hour)
            input_dict = {
                'Values': [
                    year, month, day, hour
                ],
                'StorageDescriptor': {
                    'Location': part_location,
                    'InputFormat': input_format,
                    'OutputFormat': output_format,
                    'SerdeInfo': serde_info
                }
            }
            input_list.append(input_dict.copy())
    return input_list
Run Code Online (Sandbox Code Playgroud)

第3步-调用batch_create_partition()API

for each_input in break_list_into_chunks(partition_input_list, 100):
    create_partition_response = client.batch_create_partition(
        CatalogId=catalog_id,
        DatabaseName=l_database,
        TableName=l_table,
        PartitionInputList=each_input
    )
Run Code Online (Sandbox Code Playgroud)

一个api调用中最多只能有100个分区,因此,如果要创建100个以上的分区,则需要将列表分成多个块并对其进行迭代。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.batch_create_partition


bot*_*que 6

  1. 您可以将您的胶水目录配置为每 5 分钟触发一次
  2. 您可以创建一个 lambda 函数,该函数将按计划运行,或者由您的存储桶中的事件(例如 putObject 事件)触发,并且该函数可以调用 athena 来发现分区

    import boto3
    
    athena = boto3.client('athena')
    
    def lambda_handler(event, context):
        athena.start_query_execution(
            QueryString = "MSCK REPAIR TABLE mytable",
            ResultConfiguration = {
                'OutputLocation': "s3://some-bucket/_athena_results"
            }
    
    Run Code Online (Sandbox Code Playgroud)
  3. 使用 Athena 手动添加分区。您还可以通过 API 运行 sql 查询,就像在我的 lambda 示例中一样。

    雅典娜手册中的示例:

    ALTER TABLE orders ADD
      PARTITION (dt = '2016-05-14', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_14_May_2016'
      PARTITION (dt = '2016-05-15', country = 'IN') LOCATION 's3://mystorage/path/to/INDIA_15_May_2016';
    
    Run Code Online (Sandbox Code Playgroud)


rav*_*ard 6

这个问题很老了,但我想把它放在那里,有人可以让s3:ObjectCreated:Put通知触发 Lambda 函数,当数据到达 S3 时,该函数会注册新分区。我什至会扩展这个函数来处理基于对象删除等的弃用。以下是 AWS 发布的博客文章,其中详细介绍了 S3 事件通知:https://aws.amazon.com/blogs/aws/s3-event-notification/