我最近开始使用 Apache Airflow。我正在将Taskflow APIGet_payload与一个带有 id和 的修饰任务一起使用SimpleHttpOperator。任务Get_payload从数据库获取数据,进行一些数据操作并返回 adict作为有效负载。
问题
无法将数据从上一个任务传递到下一个任务。是的,我知道,XComs但使用 Taskflow API 的全部目的是避免与XComs. get_data当直接传递给data的属性时出现以下错误SimpleHttpOperator。
airflow.exceptions.AirflowException: 400:BAD REQUEST
Run Code Online (Sandbox Code Playgroud)
到目前为止我尝试过什么?
正如此 SO 答案中提到的,我template_field在自定义传感器中使用来定义期望来自上一个任务的数据的字段。如果是SimpleHttpOperator操作员,我无法编辑它来执行相同的操作。那么如何类似地解决这个问题SimpleHttpOperator呢?
有向无环图:
airflow.exceptions.AirflowException: 400:BAD REQUEST
Run Code Online (Sandbox Code Playgroud)
完整日志:
[2021-08-28 20:28:12,947] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: http_operator.clf_api 2021-08-28T20:28:10.265689+00:00 [queued]>
[2021-08-28 20:28:12,970] {taskinstance.py:903} INFO - Dependencies all met for <TaskInstance: …Run Code Online (Sandbox Code Playgroud) 我当前正在访问气流变量,如下所示:
from airflow.models import Variable
s3_bucket = Variable.get('bucket_name')
Run Code Online (Sandbox Code Playgroud)
它有效,但我被要求不要使用变量模块并使用 jinja 模板代替(即):
s3_bucket = '{{ var.value.bucket_name }}'
Run Code Online (Sandbox Code Playgroud)
问题是当我在气流模板(例如 PythonOperator/BashOperator)中使用 jinja 时,它可以工作,但我无法让它以任务流 API 形式工作。该变量被读取为字符串文字。例子:
# Pretend DAG defined here
@task
def example_task():
s3_bucket = '{{ var.value.bucket_name }}'
print(s3_bucket)
example_task()
Run Code Online (Sandbox Code Playgroud)
上面的代码将打印“{{ var.value.bucket_name }}”而不是bucket_name值。
想要尝试新的任务流 API,我需要有 2 个并行任务。
使用 Airflow v1,我习惯做类似的事情
task_1 >> [task_2, task_3]
[task_2, task_3] >> task_4
Run Code Online (Sandbox Code Playgroud)
现在我们调用任务的方式有所不同PythonOperator
如何使用 TaskFlow 制作列表?
谢谢
我正在尝试为使用Airflow TaskFlow API构建的一些任务编写单元测试。我尝试了多种方法,例如,通过创建 dagrun 或仅运行任务函数,但没有任何帮助。
这是我从 S3 下载文件的任务,还有更多内容,但我在本示例中删除了它。
@task()
def updates_process(files):
context = get_current_context()
try:
updates_file_path = utils.download_file_from_s3_bucket(files.get("updates_file"))
except FileNotFoundError as e:
log.error(e)
return
# Do something else
Run Code Online (Sandbox Code Playgroud)
现在我试图编写一个测试用例,我可以在其中检查这个 except 子句。以下是我开始的例子
class TestAccountLinkUpdatesProcess(TestCase):
@mock.patch("dags.delta_load.updates.log")
@mock.patch("dags.delta_load.updates.get_current_context")
@mock.patch("dags.delta_load.updates.utils.download_file_from_s3_bucket")
def test_file_not_found_error(self, download_file_from_s3_bucket, get_current_context, log):
download_file_from_s3_bucket.side_effect = FileNotFoundError
task = account_link_updates_process({"updates_file": "path/to/file.csv"})
get_current_context.assert_called_once()
log.error.assert_called_once()
Run Code Online (Sandbox Code Playgroud)
我还尝试创建一个 dagrun(如文档中的示例所示)并从 dagrun 获取任务,但这也没有帮助。
我在 Airflow 的 TaskFlowAPI 中找不到分支的文档。我尝试以“Pythonic”方式进行操作,但是当运行时,DAG 没有看到task_2_execute_if_true,无论前一个任务返回的真值如何。
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['test'],
)
def my_dag():
@task()
def task_1_returns_boolean():
# evaluate and return boolean value
return boolean_value
@task()
def task_2_execute_if_true():
# do_something...
outcome_1 = task_1_returns_boolean()
if outcome_1:
outcome_2 = task_2_execute_if_true()
executed = my_dag()
Run Code Online (Sandbox Code Playgroud)
TaskFlowAPI 中正确的分支方式是什么?我应该再添加一个专门用于分支的函数吗?
我的 dag 是从配置 JSON 开始的:
\n{"foo" : "bar"}\nRun Code Online (Sandbox Code Playgroud)\n我有一个使用这个值的Python运算符:
\nmy_task = PythonOperator(\n task_id="my_task",\n op_kwargs={"foo": "{{ dag_run.conf[\'foo\'] }}"},\n python_callable=lambda foo: print(foo))\nRun Code Online (Sandbox Code Playgroud)\n我\xe2\x80\x99d喜欢用TaskFlow任务\xe2\x80\xa6替换它
\nmy_task = PythonOperator(\n task_id="my_task",\n op_kwargs={"foo": "{{ dag_run.conf[\'foo\'] }}"},\n python_callable=lambda foo: print(foo))\nRun Code Online (Sandbox Code Playgroud)\n如何从此处获取对 context、dag_run 的引用或以其他方式获取配置 JSON?
\n