Jon*_*ira 4 python airflow apache-airflow-xcom
主要问题:我正在尝试创建 BigQuery 表(如果不存在)。
方法:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTableOperator 创建或不创建新表。
问题:我无法使用 xcom 获取 BigQueryTableSensor 传感器的返回值。众所周知,poke 方法需要返回一个布尔值。
这就是我创建任务的方式:
check_if_table_exists = BigQueryTableSensor(
task_id='check_if_table_exists',
project_id='my_project',
dataset_id='my_dataset',
table_id='my_table',
bigquery_conn_id='bigquery_default',
timeout=120,
do_xcom_push=True,
)
# Output: INFO - Success criteria met. Exiting.
get_results = BashOperator(
task_id='get_results',
bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
)
# Output: INFO - Running command: echo None
Run Code Online (Sandbox Code Playgroud)
查看 Airflow 界面,我检查了 BigQueryTableSensor 没有推送任何内容:(
题:
有没有办法获得我的传感器的返回值?
有没有更好的方法来解决我的主要问题?也许使用 BigQueryOperator 和像“CREATE TABLE IF NOT EXISTS”这样的 sql 查询。
是的,这是可能的,我让它像这样工作:
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MyCustomSensor, self).__init__(*args, **kwargs)
def poke(self, context):
application_id = context['ti'].xcom_pull(key='application_id')
print("We found " + application_id)
return True
Run Code Online (Sandbox Code Playgroud)
这是一个完整的 DAG 示例:
import os
import sys
from datetime import datetime
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
dag = DAG('my_dag_name',
description='DAG ',
schedule_interval=None,
start_date=datetime(2021, 1, 7),
tags=["samples"],
catchup=False)
class MyCustomSensor(BaseSensorOperator):
@apply_defaults
def __init__(self,
*args,
**kwargs):
super(MyCustomSensor, self).__init__(*args, **kwargs)
def poke(self, context):
application_id = context['ti'].xcom_pull(key='application_id')
print("We found " + application_id)
return True
def launch_spark_job(**kwargs):
application_id = "application_1613995447156_11473"
kwargs['ti'].xcom_push(key='application_id', value=application_id)
launch_spark_job_op = PythonOperator(task_id='test_python',
python_callable=launch_spark_job,
provide_context=True,
dag=dag)
wait_spark_job_sens = MyCustomSensor(task_id='wait_spark_job',
dag=dag,
mode="reschedule")
launch_spark_job_op >> wait_spark_job_sens
Run Code Online (Sandbox Code Playgroud)
这不是传感器的用例。传感器使工作流程等待某些事情发生。在您的情况下BigQueryTableSensor,将等到其他进程创建表,然后才继续执行下游任务。
您正在寻找的是:
BigQueryCheckOperator运行返回布尔值的查询(如果表存在则为 True,否则为 False),那么您将能够从BashOperator.BranchSQLOperator),其中工作流分支基于 SQL 查询的结果来检查表是否存在。在该选项中,无需使用 XCOM。| 归档时间: |
|
| 查看次数: |
427 次 |
| 最近记录: |