主要问题:我正在尝试创建 BigQuery 表(如果不存在)。
方法:使用 BigQueryTableSensor 检查表是否存在,并根据返回值,使用 BigQueryCreateEmptyTableOperator 创建或不创建新表。
问题:我无法使用 xcom 获取 BigQueryTableSensor 传感器的返回值。众所周知,poke 方法需要返回一个布尔值。
这就是我创建任务的方式:
check_if_table_exists = BigQueryTableSensor(
task_id='check_if_table_exists',
project_id='my_project',
dataset_id='my_dataset',
table_id='my_table',
bigquery_conn_id='bigquery_default',
timeout=120,
do_xcom_push=True,
)
# Output: INFO - Success criteria met. Exiting.
get_results = BashOperator(
task_id='get_results',
bash_command="echo {{ ti.xcom_pull(task_ids='check_if_table_exists') }}"
)
# Output: INFO - Running command: echo None
Run Code Online (Sandbox Code Playgroud)
查看 Airflow 界面,我检查了 BigQueryTableSensor 没有推送任何内容:(
题:
有没有办法获得我的传感器的返回值?
有没有更好的方法来解决我的主要问题?也许使用 BigQueryOperator 和像“CREATE TABLE IF NOT EXISTS”这样的 sql 查询。