标签: airflow-taskflow

Airflow:如何将数据从装饰任务传递到 SimpleHttpOperator?

我最近开始使用 Apache Airflow。我正在将Taskflow APIGet_payload与一个带有 id和 的修饰任务一起使用SimpleHttpOperator。任务Get_payload从数据库获取数据,进行一些数据操作并返回 adict作为有效负载。

问题

无法将数据从上一个任务传递到下一个任务。是的,我知道,XComs但使用 Taskflow API 的全部目的是避免与XComs. get_data当直接传递给data的属性时出现以下错误SimpleHttpOperator

airflow.exceptions.AirflowException: 400:BAD REQUEST
Run Code Online (Sandbox Code Playgroud)

到目前为止我尝试过什么?

正如此 SO 答案中提到的,我template_field在自定义传感器中使用来定义期望来自上一个任务的数据的字段。如果是SimpleHttpOperator操作员,我无法编辑它来执行相同的操作。那么如何类似地解决这个问题SimpleHttpOperator呢?

我已经检查了这个 SO 答案这个

有向无环图:

airflow.exceptions.AirflowException: 400:BAD REQUEST
Run Code Online (Sandbox Code Playgroud)

完整日志:

[2021-08-28 20:28:12,947] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: …
Run Code Online (Sandbox Code Playgroud)

airflow airflow-taskflow

6
推荐指数
1
解决办法
2450
查看次数

如何使用 jinja 模板通过任务装饰器访问 Airflow 变量?

我当前正在访问气流变量,如下所示:

from airflow.models import Variable

s3_bucket = Variable.get('bucket_name')
Run Code Online (Sandbox Code Playgroud)

它有效,但我被要求不要使用变量模块并使用 jinja 模板代替(即):

s3_bucket = '{{ var.value.bucket_name }}'
Run Code Online (Sandbox Code Playgroud)

问题是当我在气流模板(例如 PythonOperator/BashOperator)中使用 jinja 时,它可以工作,但我无法让它以任务流 API 形式工作。该变量被读取为字符串文字。例子:

# Pretend DAG defined here

@task
def example_task():
    s3_bucket = '{{ var.value.bucket_name }}'
    print(s3_bucket)

example_task()
Run Code Online (Sandbox Code Playgroud)

上面的代码将打印“{{ var.value.bucket_name }}”而不是bucket_name值。

airflow airflow-taskflow

6
推荐指数
1
解决办法
8092
查看次数

Airflow 任务流 - 并行运行任务

想要尝试新的任务流 API,我需要有 2 个并行任务。

使用 Airflow v1,我习惯做类似的事情

task_1 >> [task_2, task_3]
[task_2, task_3] >> task_4
Run Code Online (Sandbox Code Playgroud)

现在我们调用任务的方式有所不同PythonOperator

如何使用 TaskFlow 制作列表?

谢谢

airflow airflow-taskflow

5
推荐指数
1
解决办法
997
查看次数

如何为 @task 装饰的 Airflow 任务编写单元测试?

我正在尝试为使用Airflow TaskFlow API构建的一些任务编写单元测试。我尝试了多种方法,例如,通过创建 dagrun 或仅运行任务函数,但没有任何帮助。

这是我从 S3 下载文件的任务,还有更多内容,但我在本示例中删除了它。

@task()
def updates_process(files):
    context = get_current_context()
    try:
        updates_file_path = utils.download_file_from_s3_bucket(files.get("updates_file"))
    except FileNotFoundError as e:
        log.error(e)
        return

    # Do something else
Run Code Online (Sandbox Code Playgroud)

现在我试图编写一个测试用例,我可以在其中检查这个 except 子句。以下是我开始的例子

class TestAccountLinkUpdatesProcess(TestCase):
    @mock.patch("dags.delta_load.updates.log")
    @mock.patch("dags.delta_load.updates.get_current_context")
    @mock.patch("dags.delta_load.updates.utils.download_file_from_s3_bucket")
    def test_file_not_found_error(self, download_file_from_s3_bucket, get_current_context, log):
        download_file_from_s3_bucket.side_effect = FileNotFoundError
        task = account_link_updates_process({"updates_file": "path/to/file.csv"})
        get_current_context.assert_called_once()
        log.error.assert_called_once()
Run Code Online (Sandbox Code Playgroud)

我还尝试创建一个 dagrun(如文档中的示例所示)并从 dagrun 获取任务,但这也没有帮助。

python-unittest airflow airflow-taskflow

5
推荐指数
1
解决办法
1773
查看次数

使用 TaskFlowAPI 在 Apache Airflow 中进行分支

我在 Airflow 的 TaskFlowAPI 中找不到分支的文档。我尝试以“Pythonic”方式进行操作,但是当运行时,DAG 没有看到task_2_execute_if_true,无论前一个任务返回的真值如何。

@dag(
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['test'],
)
def my_dag():
    @task()
    def task_1_returns_boolean():
        # evaluate and return boolean value
        return boolean_value
    
    @task()
    def task_2_execute_if_true():
        # do_something...

    outcome_1 = task_1_returns_boolean()
    if outcome_1:
        outcome_2 = task_2_execute_if_true() 


executed = my_dag()
Run Code Online (Sandbox Code Playgroud)

TaskFlowAPI 中正确的分支方式是什么?我应该再添加一个专门用于分支的函数吗?

python airflow airflow-taskflow

2
推荐指数
1
解决办法
1459
查看次数

获取 Airflow TaskFlow 任务中的 dag_run 上下文

我的 dag 是从配置 JSON 开始的:

\n
{"foo" : "bar"}\n
Run Code Online (Sandbox Code Playgroud)\n

我有一个使用这个值的Python运算符:

\n
my_task = PythonOperator(\n    task_id="my_task",\n    op_kwargs={"foo": "{{ dag_run.conf[\'foo\'] }}"},\n    python_callable=lambda foo: print(foo))\n
Run Code Online (Sandbox Code Playgroud)\n

我\xe2\x80\x99d喜欢用TaskFlow任务\xe2\x80\xa6替换它

\n
my_task = PythonOperator(\n    task_id="my_task",\n    op_kwargs={"foo": "{{ dag_run.conf[\'foo\'] }}"},\n    python_callable=lambda foo: print(foo))\n
Run Code Online (Sandbox Code Playgroud)\n

如何从此处获取对 context、dag_run 的引用或以其他方式获取配置 JSON?

\n

airflow airflow-2.x airflow-taskflow

1
推荐指数
1
解决办法
2684
查看次数