如何在 Apache Airflow 中查询 Google Big Query 并将结果作为 Pandas Dataframe 返回?

Sru*_*ule 5 google-bigquery airflow

我正在尝试将 bigquery 查询保存到自定义 Airflow 运算符中的数据帧。

我尝试过使用 airflow.contrib.hooks.bigquery_hook 和 get_pandas_df 方法。该任务卡在身份验证上,因为它希望我手动访问 url 进行身份验证。

因此,我对身份验证进行了硬编码。这可行,但绝对不理想。

工作但不理想(凭证是硬编码的):

def execute(self, context):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'my-file-location.json'
        client = bigquery.Client()

        job_config = bigquery.QueryJobConfig()

        df = client.query(
            self.query,
            location="US",
            job_config=job_config,).to_dataframe()
Run Code Online (Sandbox Code Playgroud)

不工作:

def execute(self, context):
    bq  = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=None,use_legacy_sql=True, location='US')
    df = bq.get_pandas_df(self.query)
Run Code Online (Sandbox Code Playgroud)

这段代码在验证时卡住了。以下是日志: [2019-06-19 12:56:05,526] {logging_mixin.py:95} 信息 - 请访问此 URL 以授权此应用程序。

小智 5

不知怎的,我无法让BigQueryPandasConnector工作。我最终得到的是使用 BigQueryHook 的凭据,使用 BigQuery 的官方 Python 客户端创建一个正常的bigquery.client.Client

这是一个例子:

from google.cloud import bigquery

bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, use_legacy_sql=False)
bq_client = bigquery.Client(project = bq_hook._get_field("project"), credentials = bq_hook._get_credentials())
df = bq_client.query(sql).to_dataframe()
Run Code Online (Sandbox Code Playgroud)


Olu*_*ule 0

似乎没有为挂钩指定服务帐户或密钥路径。

这是设置 GCP 连接的指南。 https://github.com/apache/airflow/blob/1.10.3/docs/howto/connection/gcp.rst

AIRFLOW_CONN_BIGQUERY_DEFAULT在气流配置文件中

key_path如果凭证在气流进程可访问的路径中可用, 您可以采用使用查询参数的方式。
否则,将key_dict查询参数设置为凭据文件的 URL 编码 JSON 内容。

AIRFLOW_CONN_BIGQUERY_DEFAULT=google-cloud-platform://?extra__google_cloud_platform__key_path=%2Fkeys%2Fkey.json&extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&extra__google_cloud_platform__project=airflow&extra__google_cloud_platform__num_retries=5
Run Code Online (Sandbox Code Playgroud)