isc*_*odv 5 python amazon-web-services boto3 amazon-athena aws-glue
我有一个 Angular 6 应用程序,它从 AWS Lambda 请求数据。数据本身存储在 Glue 数据库中并使用 AWS Athena 进行查询。AWS Glue 数据库skip.header.line.count=1设置了选项,当我在控制台中运行 Athena 查询时,我得到的响应没有标头。当我尝试使用 检索数据时会出现问题boto3。我有一个运行查询然后对结果进行分页的函数:
def run_query_paged(self, query, page_token=None, page_size=10):
"""
Run query.
"""
request = self.athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': self.database
},
ResultConfiguration={
'OutputLocation': self.s3_output,
}
)
execution_id = request['QueryExecutionId']
if execution_id:
while True:
stats = self.athena_client.get_query_execution(QueryExecutionId=execution_id)
status = stats['QueryExecution']['Status']['State']
if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(0.2) # 200ms
if status == 'SUCCEEDED':
paginator = self.athena_client.get_paginator('get_query_results')
pagination_config = {
'MaxItems': page_size,
'PageSize': page_size,
}
if page_token:
pagination_config['StartingToken'] = page_token
response_iterator = paginator.paginate(
QueryExecutionId=execution_id,
PaginationConfig=pagination_config
)
for page in response_iterator:
next_token = page.get('NextToken', '')
results = page
break
return {
'rows': process_results(results),
'nextToken': next_token
}
if status == 'FAILED':
raise Exception(stats['QueryExecution']['Status']['StateChangeReason'])
return None
Run Code Online (Sandbox Code Playgroud)
该process_results函数将响应转换为考虑列类型的列表:
def process_results(response):
"""
Processes the result of get_query_results function
"""
rows = response['ResultSet']['Rows']
meta = response['ResultSet']['ResultSetMetadata']['ColumnInfo']
result = []
for row in rows:
parsed_row = {}
for idx, val in enumerate(row['Data']):
field = val
column_info = meta[idx]
if 'VarCharValue' in val:
value = val['VarCharValue']
else:
value = ''
parsed_row[column_info['Name']] = process_row_value(value, column_info)
result.append(parsed_row)
return result
Run Code Online (Sandbox Code Playgroud)
问题是分页响应的第一页的标题具有如下列名称:
{
"foo": "foo",
"bar": "bar"
},
{
"foo": 1,
"bar": 2
},
...
Run Code Online (Sandbox Code Playgroud)
而所有其他页面都没有它。当我从客户端应用程序请求第一页时,我会得到一个标题加上 9 行(页面大小为 10),当我使用以下命令请求下一页时,NextToken我会得到 10 行,但没有标题。第一页显示 9 项,后续页显示 10 项,这是相当尴尬的。
如何跳过标题对结果进行分页?
我还没有找到任何跳过标头的选项,并通过page_size + 1在第一个请求中请求结果来破解它,然后再page_size请求其余的结果。
def _build_response(self, execution_id: str, starting_token: Optional[str], page_size: int) -> AthenaPagedResult:
"""
Returns the query result for the provided page as well as a token to the next page if there are more
results to retrieve for the query.
"""
paginator = self.athena_client.get_paginator('get_query_results')
# The first page of response contains header. Increase the page size for a first page and then
# remove header so that all the pages would have the same size.
if starting_token:
skip_header = False
else:
page_size += 1
skip_header = True
max_items = page_size * 2
pagination_config = {
'MaxItems': min(max_items, MAXIMUM_ALLOWED_ITEMS_NUMBER),
'PageSize': min(page_size, MAXIMUM_ALLOWED_ITEMS_NUMBER)
}
if starting_token:
pagination_config['StartingToken'] = starting_token
response_iterator = paginator.paginate(QueryExecutionId=execution_id, PaginationConfig=pagination_config)
iterator_index = 0
results = EMPTY_ATHENA_RESPONSE
next_token = None
# Retrieve only a single page and return the next token for the caller to iterate the response.
for page in response_iterator:
if iterator_index > 0:
if len(page['ResultSet']['Rows']) == 0:
next_token = None
break
next_token = page.get('NextToken')
results = page
iterator_index += 1
# ... process and return results
Run Code Online (Sandbox Code Playgroud)