对 AWS Athena 查询结果进行分页时如何跳过标头

isc*_*odv 5 python amazon-web-services boto3 amazon-athena aws-glue

我有一个 Angular 6 应用程序,它从 AWS Lambda 请求数据。数据本身存储在 Glue 数据库中并使用 AWS Athena 进行查询。AWS Glue 数据库skip.header.line.count=1设置了选项,当我在控制台中运行 Athena 查询时,我得到的响应没有标头。当我尝试使用 检索数据时会出现问题boto3。我有一个运行查询然后对结果进行分页的函数:

def run_query_paged(self, query, page_token=None, page_size=10):
    """
    Run query.
    """
    request = self.athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': self.database
            },
        ResultConfiguration={
            'OutputLocation': self.s3_output,
            }
        )
    execution_id = request['QueryExecutionId']

    if execution_id:
        while True:
            stats = self.athena_client.get_query_execution(QueryExecutionId=execution_id)
            status = stats['QueryExecution']['Status']['State']
            if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
                break
            time.sleep(0.2)  # 200ms

        if status == 'SUCCEEDED':
            paginator = self.athena_client.get_paginator('get_query_results')
            pagination_config = {
                'MaxItems': page_size,
                'PageSize': page_size,
            }
            if page_token:
                pagination_config['StartingToken'] = page_token

            response_iterator = paginator.paginate(
                QueryExecutionId=execution_id,
                PaginationConfig=pagination_config
            )

            for page in response_iterator:
                next_token = page.get('NextToken', '')
                results = page
                break

            return {
                'rows': process_results(results),
                'nextToken': next_token
            }
        if status == 'FAILED':
            raise Exception(stats['QueryExecution']['Status']['StateChangeReason'])

    return None
Run Code Online (Sandbox Code Playgroud)

process_results函数将响应转换为考虑列类型的列表:

def process_results(response):
    """
    Processes the result of get_query_results function
    """
    rows = response['ResultSet']['Rows']
    meta = response['ResultSet']['ResultSetMetadata']['ColumnInfo']
    result = []
    for row in rows:
        parsed_row = {}
        for idx, val in enumerate(row['Data']):
            field = val
            column_info = meta[idx]
            if 'VarCharValue' in val:
                value = val['VarCharValue']
            else:
                value = ''
            parsed_row[column_info['Name']] = process_row_value(value, column_info)
        result.append(parsed_row)
    return result
Run Code Online (Sandbox Code Playgroud)

问题是分页响应的第一页的标题具有如下列名称:

{
    "foo": "foo",
    "bar": "bar"
},
{
    "foo": 1,
    "bar": 2
},
...
Run Code Online (Sandbox Code Playgroud)

而所有其他页面都没有它。当我从客户端应用程序请求第一页时,我会得到一个标题加上 9 行(页面大小为 10),当我使用以下命令请求下一页时,NextToken我会得到 10 行,但没有标题。第一页显示 9 项,后续页显示 10 项,这是相当尴尬的。

如何跳过标题对结果进行分页?

isc*_*odv 2

我还没有找到任何跳过标头的选项,并通过page_size + 1在第一个请求中请求结果来破解它,然后再page_size请求其余的结果。

def _build_response(self, execution_id: str, starting_token: Optional[str], page_size: int) -> AthenaPagedResult:
    """
    Returns the query result for the provided page as well as a token to the next page if there are more
    results to retrieve for the query.
    """
    paginator = self.athena_client.get_paginator('get_query_results')

    # The first page of response contains header. Increase the page size for a first page and then
    # remove header so that all the pages would have the same size.
    if starting_token:
        skip_header = False
    else:
        page_size += 1
        skip_header = True
    max_items = page_size * 2

    pagination_config = {
        'MaxItems': min(max_items, MAXIMUM_ALLOWED_ITEMS_NUMBER),
        'PageSize': min(page_size, MAXIMUM_ALLOWED_ITEMS_NUMBER)
    }
    if starting_token:
        pagination_config['StartingToken'] = starting_token

    response_iterator = paginator.paginate(QueryExecutionId=execution_id, PaginationConfig=pagination_config)


    iterator_index = 0
    results = EMPTY_ATHENA_RESPONSE
    next_token = None

    # Retrieve only a single page and return the next token for the caller to iterate the response.
    for page in response_iterator:
        if iterator_index > 0:
            if len(page['ResultSet']['Rows']) == 0:
                next_token = None
            break
        next_token = page.get('NextToken')
        results = page
        iterator_index += 1

    # ... process and return results
Run Code Online (Sandbox Code Playgroud)