hla*_*gos 6 python google-bigquery airflow
我正在尝试BigQueryOperator使用气流获取结果,但找不到方法。我尝试调用成员next()中的方法bq_cursor(在 1.10 中可用),但它返回None. 这就是我尝试做的方式
import datetime
import logging
from airflow import models
from airflow.contrib.operators import bigquery_operator
from airflow.operators import python_operator
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time()
)
def MyChequer(**kwargs):
big_query_count = bigquery_operator.BigQueryOperator(
task_id='my_bq_query',
sql='select count(*) from mydataset.mytable'
)
big_query_count.execute(context=kwargs)
logging.info(big_query_count)
logging.info(big_query_count.__dict__)
logging.info(big_query_count.bq_cursor.next())
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'project_id': 'myproject'
}
with models.DAG(
'bigquery_results_execution',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
myoperator = python_operator.PythonOperator(
task_id='threshold_operator',
provide_context=True,
python_callable=MyChequer
)
# Define DAG
myoperator
Run Code Online (Sandbox Code Playgroud)
查看bigquery_hook.py和bigquery_operator.py它似乎是获取结果的唯一可用方法。
每当我需要从 BigQuery 查询获取数据并将其用于某些用途时,我都会使用 BigQuery 挂钩创建自己的运算符。我通常将其称为 BigQueryToXOperator,我们有一堆用于将 BigQuery 数据发送到其他内部系统的运算符。
例如,我有一个 BigQueryToPubSub 运算符,您可能会发现它作为示例非常有用,用于说明如何查询 BigQuery,然后逐行处理结果,将它们发送到 Google PubSub。请考虑以下通用示例代码,了解如何自行执行此操作:
class BigQueryToXOperator(BaseOperator):
template_fields = ['sql']
ui_color = '#000000'
@apply_defaults
def __init__(
self,
sql,
keys,
bigquery_conn_id='bigquery_default',
delegate_to=None,
*args,
**kwargs):
super(BigQueryToXOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.keys = keys # A list of keys for the columns in the result set of sql
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
def execute(self, context):
"""
Run query and handle results row by row.
"""
cursor = self._query_bigquery()
for row in cursor.fetchall():
# Zip keys and row together because the cursor returns a list of list (not list of dicts)
row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')
# Do what you want with the row...
handle_row(row_dict)
def _query_bigquery(self):
"""
Queries BigQuery and returns a cursor to the results.
"""
bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
use_legacy_sql=False)
conn = bq.get_conn()
cursor = conn.cursor()
cursor.execute(self.sql)
return cursor
Run Code Online (Sandbox Code Playgroud)
感谢@kaxil 和@Mike 的回答。我发现了问题。有一种错误(在我看来) BigQueryCursor。作为 的一部分run_with_configuration,running_job_id被返回但从未分配给job_id用于在next方法中提取结果的。一种解决方法(不是很优雅,但如果您不想重新实现所有内容,则很好),是job_id根据running_job_id钩子中的 进行分配,如下所示
big_query_count.execute(context=kwargs)
#workaround
big_query_count.bq_cursor.job_id = big_query_count.bq_cursor.running_job_id
logging.info(big_query_count.bq_cursor.next())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
25206 次 |
| 最近记录: |