相关疑难解决方法(0)

在气流中创建子标签时访问父 dag 上下文?

我试图在 subdag 创建时访问来自父 dag 的一些 xcom 数据,我正在寻找在互联网上实现这一目标,但我没有找到任何东西。

def test(task_id):
    logging.info(f' execution of task {task_id}')


def load_subdag(parent_dag_id, child_dag_id, args):
    dag_subdag = DAG(
        dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
        default_args=args,
        schedule_interval="@daily",
    )
    with dag_subdag:
        r = DummyOperator(task_id='random')

        for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
            t = PythonOperator(
                task_id='load_subdag_{0}'.format(i),
                default_args=args,
                python_callable=print_context,
                op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
                dag=dag_subdag,
            )

    return dag_subdag

load_tasks = SubDagOperator(
        task_id='load_tasks',
        subdag=load_subdag(dag.dag_id,
                           'load_tasks', args),
        default_args=args,
    )
Run Code Online (Sandbox Code Playgroud)

我的代码出现此错误

1  | Traceback (most recent call last):
airflow_1  |   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1  |     m = imp.load_source(mod_name, filepath) …
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow-xcom

6
推荐指数
1
解决办法
5230
查看次数

如何动态嵌套Airflow DAG?

我有一个由三个运算符组成的简单DAG。第一个是PythonOperator我们自己的功能,另外两个是标准的运营商从airflow.contribFileToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator要准确)。它们按顺序工作。根据参数,我们的自定义任务会生成许多文件,通常在2到5之间。所有这些文件都必须由后续任务分别处理。这意味着我想要几个下游分支,但在运行DAG之前确切知道有多少个分支。

您将如何解决这个问题?

更新:

使用BranchPythonOperator他在另一个答复中提到的jhnclvr 作为出发点,我创建了一个根据条件跳过或继续执行分支的运算符。该方法仅是可行的,因为已知最大可能的分支数并且足够小。

运营商:

class SkipOperator(PythonOperator):
    def execute(self, context):
        boolean = super(SkipOperator, self).execute(context)
        session = settings.Session()
        for task in context['task'].downstream_list:
            if boolean is False:
                ti = TaskInstance(
                    task, execution_date=context['ti'].execution_date)
                ti.state = State.SKIPPED
                ti.start_date = datetime.now()
                ti.end_date = datetime.now()
                session.merge(ti)
        session.commit()
        session.close()
Run Code Online (Sandbox Code Playgroud)

用法:

def check(i, templates_dict=None, **kwargs):
    return len(templates_dict["data_list"].split(",")) > i

dag = DAG(
    dag_name,
    default_args=default_args,
    schedule_interval=None
)

load = CustomOperator(
    task_id="load_op",
    bash_command=' '.join([
        './command.sh'
        '--data-list {{ …
Run Code Online (Sandbox Code Playgroud)

airflow

5
推荐指数
1
解决办法
1665
查看次数

运行时添加到 DAG 的任务无法调度

我的想法是有一个foo生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。

因此,理想情况下,我的 DAG 应如下所示: 在此处输入图片说明

这里唯一的变量是生成的任务数量。在完成所有这些任务后,我想再做一些任务,因此为每个任务启动一个新的 DAG 似乎并不合适。

这是我的代码:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1)
}

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)

foo_operator = BashOperator(
    task_id='foo',
    bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
    xcom_push=True,
    dag=dag)

def gen_nodes(**kwargs):
    ti = kwargs['ti']
    workers = json.loads(ti.xcom_pull(task_ids='foo'))

    for wid in workers:
        print("Iterating worker %s" % wid)
        op = PythonOperator(
            task_id='test_op_%s' % wid,
            python_callable=lambda: print("Dynamic task!"),
            dag=dag
        )

        op.set_downstream(bar_operator)
        op.set_upstream(dummy_op)

gen_subdag_node_op = PythonOperator(
    task_id='gen_subdag_nodes',
    python_callable=gen_nodes,
    provide_context=True,
    dag=dag
)

gen_subdag_node_op.set_upstream(foo_operator)

dummy_op …
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow airflow-scheduler

2
推荐指数
1
解决办法
1735
查看次数