Bre*_*the 6 python python-3.x airflow
几个运算符允许提取数据,但我从未设法使用结果。
例如:https : //github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_get_data.py
该运算符可以按如下方式调用:
get_data = BigQueryGetDataOperator(
task_id='get_data_from_bq',
dataset_id='test_dataset',
table_id='Transaction_partitions',
max_results='100',
selected_fields='DATE',
bigquery_conn_id='airflow-service-account'
)
Run Code Online (Sandbox Code Playgroud)
然而,get_data 是 DAG 类型,但第 116 行说“返回 table_data”。需要明确的是,操作员工作并检索数据,我只是不明白如何使用数据检索/数据所在的位置。
如何使用上面的“get_data”获取数据?
kax*_*xil 11
您将get_data
在下一个任务中使用的方式可以是一个PythonOperator
,然后您可以使用它来处理数据。
get_data = BigQueryGetDataOperator(
task_id='get_data_from_bq',
dataset_id='test_dataset',
table_id='Transaction_partitions',
max_results='100',
selected_fields='DATE',
bigquery_conn_id='airflow-service-account'
)
def process_data_from_bq(**kwargs):
ti = kwargs['ti']
bq_data = ti.xcom_pull(task_ids='get_data_from_bq')
# Now bq_data here would have your data in Python list
print(bq_data)
process_data = PythonOperator(
task_id='process_data_from_bq',
python_callable=process_bq_data,
provide_context=True
)
get_data >> process_data
Run Code Online (Sandbox Code Playgroud)
PS:我是BigQueryGetDataOperator
Airflow 提交者 / PMC 的作者
归档时间: |
|
查看次数: |
1402 次 |
最近记录: |