Sru*_*ule 5 google-bigquery airflow
我正在尝试将 bigquery 查询保存到自定义 Airflow 运算符中的数据帧。
我尝试过使用 airflow.contrib.hooks.bigquery_hook 和 get_pandas_df 方法。该任务卡在身份验证上,因为它希望我手动访问 url 进行身份验证。
因此,我对身份验证进行了硬编码。这可行,但绝对不理想。
工作但不理想(凭证是硬编码的):
def execute(self, context):
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'my-file-location.json'
client = bigquery.Client()
job_config = bigquery.QueryJobConfig()
df = client.query(
self.query,
location="US",
job_config=job_config,).to_dataframe()
Run Code Online (Sandbox Code Playgroud)
不工作:
def execute(self, context):
bq = BigQueryHook(bigquery_conn_id=self.gcp_conn_id, delegate_to=None,use_legacy_sql=True, location='US')
df = bq.get_pandas_df(self.query)
Run Code Online (Sandbox Code Playgroud)
这段代码在验证时卡住了。以下是日志: [2019-06-19 12:56:05,526] {logging_mixin.py:95} 信息 - 请访问此 URL 以授权此应用程序。
小智 5
不知怎的,我无法让BigQueryPandasConnector工作。我最终得到的是使用 BigQueryHook 的凭据,使用 BigQuery 的官方 Python 客户端创建一个正常的bigquery.client.Client。
这是一个例子:
from google.cloud import bigquery
bq_hook = BigQueryHook(bigquery_conn_id=bigquery_conn_id, use_legacy_sql=False)
bq_client = bigquery.Client(project = bq_hook._get_field("project"), credentials = bq_hook._get_credentials())
df = bq_client.query(sql).to_dataframe()
Run Code Online (Sandbox Code Playgroud)
似乎没有为挂钩指定服务帐户或密钥路径。
这是设置 GCP 连接的指南。 https://github.com/apache/airflow/blob/1.10.3/docs/howto/connection/gcp.rst
放AIRFLOW_CONN_BIGQUERY_DEFAULT在气流配置文件中
key_path如果凭证在气流进程可访问的路径中可用,
您可以采用使用查询参数的方式。
否则,将key_dict查询参数设置为凭据文件的 URL 编码 JSON 内容。
AIRFLOW_CONN_BIGQUERY_DEFAULT=google-cloud-platform://?extra__google_cloud_platform__key_path=%2Fkeys%2Fkey.json&extra__google_cloud_platform__scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform&extra__google_cloud_platform__project=airflow&extra__google_cloud_platform__num_retries=5
Run Code Online (Sandbox Code Playgroud)