小编Jas*_*ith的帖子

使用插件导入DAG时出现气流错误 - 只能在操作员之间设置关系

我编写了一个气流插件,它只包含一个自定义运算符(以支持BigQuery中的CMEK).我可以使用单个任务创建一个简单的DAG,该任务使用此运算符并且执行正常.

但是,如果我尝试在DAG中从DummyOperator任务创建依赖关系到我的自定义操作员任务,则DAG无法在UI中加载并抛出以下错误,我无法理解为什么会抛出此错误?

破坏的DAG:[/home/airflow/gcs/dags/js_bq_custom_plugin_v2.py]关系只能在运营商之间设置; 收到BQCMEKOperator

到目前为止,我已经在composer-1.4.2-airflow-1.9.0,composer-1.4.2-airflow-1.10.0和composer-1.4.1-airflow-1.10.0上进行了测试.

每个任务的运行气流测试都可以顺利完成.

在DAG中单独使用它可以正常工作(如下所示)所以我不相信插件本身存在任何错误

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator


default_dag_args = {
    'start_date': datetime.datetime(2019,1,1),
    'retries': 0
}


dag = DAG(
    'js_bq_custom_plugin',
    schedule_interval=None,
    catchup=False,
    concurrency=1,
    max_active_runs=1,
    default_args=default_dag_args)

run_this = BQCMEKOperator(
    task_id     = 'cmek_plugin_test',
    sql         = 'select * from ds.foo LIMIT 15',
    project     = 'xxx',
    dataset     = 'js_dev',
    table       = 'cmek_test10',
    key         = 'xxx',
    dag     = dag
)
Run Code Online (Sandbox Code Playgroud)

然而,如果我引入DummyOperator和依赖项,则会发生错误

import datetime
import logging
from airflow.models import DAG
from airflow.operators.bq_cmek_plugin import BQCMEKOperator
from …
Run Code Online (Sandbox Code Playgroud)

airflow google-cloud-composer

6
推荐指数
1
解决办法
1183
查看次数

标签 统计

airflow ×1

google-cloud-composer ×1