如何使用Boto3 get_query_results方法从AWS Athena创建数据框

Niv*_*hen 8 python amazon-web-services dataframe pandas amazon-athena

我正在使用AWS Athena从S3查询原始数据。由于Athena将查询输出写入S3输出存储桶,所以我曾经这样做:

df = pd.read_csv(OutputLocation)
Run Code Online (Sandbox Code Playgroud)

但这似乎是一种昂贵的方法。最近,我注意到该get_query_results方法boto3返回一个复杂的结果字典。

client = boto3.client('athena')
response = client.get_query_results(
        QueryExecutionId=res['QueryExecutionId']
        )
Run Code Online (Sandbox Code Playgroud)

我面临两个主要问题:

  1. 如何将结果格式化get_query_resultspandas数据框?
  2. get_query_results仅返回1000行。如何使用它获得两百万行?

Ham*_*ani 11

一个非常简单的解决方案是使用 boto3 Athena 分页器的列表理解。然后可以简单地将列表理解传递到pd.DataFrame()创建一个 DataFrame 中,

pd.DataFrame([[data.get('VarCharValue') for data in row['Data']] for row in
              results['ResultSet']['Rows']])
Run Code Online (Sandbox Code Playgroud)

Boto3 Athena 到 Pandas DataFrame

import pandas as pd
import boto3

result = get_query_results( . . . ) # your code here

def cleanQueryResult(result) :
    '''
    This will take the dictionary of the raw Boto3 Athena results and turn it into a 
    2D array for further processing

    Parameters
    ----------
    result dict
        The dictionary from the boto3 Athena client function get_query_results

    Returns
    -------
    list(list())
        2D list which is essentially the table result. The first row is the column name.
    '''
    return [[data.get('VarCharValue') for data in row['Data']]
            for row in result['ResultSet']['Rows']]

# note that row 1 is the header
df = pd.DataFrame(cleanQueryResult(result))

Run Code Online (Sandbox Code Playgroud)

数以百万计的结果

这需要一个分页器对象,https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/athena.html#paginators

作为提示,以下是如何在每页后附加内容

df.append(pd.DataFrame(cleanQueryResult(next_page), ignore_index = True))
Run Code Online (Sandbox Code Playgroud)

  • 要正确设置标题,您可以执行 `clean_result = cleanQueryResult(result) df = pd.Dataframe(clean_result[1:], columns=clean_result[0]) ` (5认同)

Eri*_*let 9

get_query_results仅返回1000行。我如何使用它将200万行放入Pandas数据框中?

如果您尝试添加:

client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)
Run Code Online (Sandbox Code Playgroud)

您将获得下一个错误:

调用GetQueryResults操作时发生错误(InvalidRequestException):MaxResults超过最大允许长度1000。

如果直接从存储桶s3(在下一个示例中为Pandas Dataframe)中获取文件,则可以获取数百万行:

def obtain_data_from_s3(self):
    self.resource = boto3.resource('s3', 
                          region_name = self.region_name, 
                          aws_access_key_id = self.aws_access_key_id,
                          aws_secret_access_key= self.aws_secret_access_key)

    response = self.resource \
    .Bucket(self.bucket) \
    .Object(key= self.folder + self.filename + '.csv') \
    .get()

    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
Run Code Online (Sandbox Code Playgroud)

self.filename可以是:

self.filename = response['QueryExecutionId'] + ".csv"
Run Code Online (Sandbox Code Playgroud)

因为Athena将文件命名为QueryExecutionId。我将为您编写所有需要查询的代码,并返回包含所有行和列的数据框。

import time
import boto3
import pandas as pd
import io

class QueryAthena:

    def __init__(self, query, database):
        self.database = database
        self.folder = 'my_folder/'
        self.bucket = 'my_bucket'
        self.s3_input = 's3://' + self.bucket + '/my_folder_input'
        self.s3_output =  's3://' + self.bucket + '/' + self.folder
        self.region_name = 'us-east-1'
        self.aws_access_key_id = "my_aws_access_key_id"
        self.aws_secret_access_key = "my_aws_secret_access_key"
        self.query = query

    def load_conf(self, q):
        try:
            self.client = boto3.client('athena', 
                              region_name = self.region_name, 
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext={
                    'Database': self.database
                    },
                    ResultConfiguration={
                    'OutputLocation': self.s3_output,
                    }
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])

        except Exception as e:
            print(e)
        return response                

    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:              
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "{}" finished.'.format(self.query))

            df = self.obtain_data()
            return df

        except Exception as e:
            print(e)      

    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3', 
                                  region_name = self.region_name, 
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()

            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)  


if __name__ == "__main__":       
    query = "SELECT * FROM bucket.folder"
    qa = QueryAthena(query=query, database='myAthenaDb')
    dataframe = qa.run_query()
Run Code Online (Sandbox Code Playgroud)


Niv*_*hen 6

我有第一个问题的解决方案,使用以下功能

def results_to_df(results):

    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]

    listed_results = []
    for res in results['ResultSet']['Rows'][1:]:
         values = []
         for field in res['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))

        listed_results.append(
            dict(zip(columns, values))
        )

    return listed_results
Run Code Online (Sandbox Code Playgroud)

然后:

t = results_to_df(response)
pd.DataFrame(t)
Run Code Online (Sandbox Code Playgroud)

至于我的第二个问题和@EricBellet的请求,我还添加了分页方法,与从S3中加载Athena输出的结果相比,我发现这种方法效率低下且时间更长:

def run_query(query, database, s3_output):
    ''' 
    Function for executing Athena queries and return the query ID 
    '''
    client = boto3.client('athena')
    response = client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
    print('Execution ID: ' + response['QueryExecutionId'])
    return response



def format_result(results):
    '''
    This function format the results toward append in the needed format.
    '''
    columns = [
        col['Label']
        for col in results['ResultSet']['ResultSetMetadata']['ColumnInfo']
    ]

    formatted_results = []

    for result in results['ResultSet']['Rows'][0:]:
        values = []
        for field in result['Data']:
            try:
                values.append(list(field.values())[0]) 
            except:
                values.append(list(' '))

        formatted_results.append(
            dict(zip(columns, values))
        )
    return formatted_results



res = run_query(query_2, database, s3_ouput) #query Athena



import sys
import boto3

marker = None
formatted_results = []
query_id = res['QueryExecutionId']
i = 0
start_time = time.time()

while True:
    paginator = client.get_paginator('get_query_results')
    response_iterator = paginator.paginate( 
        QueryExecutionId=query_id,
        PaginationConfig={
            'MaxItems': 1000,
            'PageSize': 1000,
            'StartingToken': marker})

    for page in response_iterator:
        i = i + 1
        format_page = format_result(page)
        if i == 1:
            formatted_results = pd.DataFrame(format_page)
        elif i > 1:
            formatted_results = formatted_results.append(pd.DataFrame(format_page))

    try:
        marker = page['NextToken']
    except KeyError:
        break

print ("My program took", time.time() - start_time, "to run")
Run Code Online (Sandbox Code Playgroud)

它的格式不是很好,但是我认为它可以完成工作...


Cem*_*Cem 5

您可以使用 AWS Data Wrangler 创建直接通过 Athena 查询的 Pandas 数据框。

import awswrangler as wr  
df = wr.athena.read_sql_query(sql="SELECT * FROM <table_name_in_Athena>", database="<database_name>")
Run Code Online (Sandbox Code Playgroud)

你可以在这里找到更多信息

  • 优秀的图书馆,这确实是2021年最好的答案 (2认同)