Kar*_*Kar 3 airflow airflow-2.x
需求:根据SQL的返回值分支下一个任务
我尝试使用branchsqloperator实现
错误:但出现 TypeError: 'XComArg' object is not iterable 错误
下面是SQL查询
SELECT
CASE
WHEN COUNT(*) > 2 THEN 0 --false
ELSE 1 --true
END
FROM
TABLE_A;
Run Code Online (Sandbox Code Playgroud)
达格:
@task(task_id="task_a")
def task_a():
return "job_success"
@task(task_id="task_b")
def task_b():
return "job_failure"
task_a = task_a()
task_b = task_b()
branch_sql = BranchSQLOperator(
task_id='branch_sql',
sql='query.sql',
follow_task_ids_if_true = task_a,
follow_task_ids_if_false = task_b,
conn_id = 'default_conn_id',
)
branch_sql >> [task_a,task_b]
Run Code Online (Sandbox Code Playgroud)
[![在此处输入图像描述][1]][1]
以下是日志:
[2022-07-20, 21:11:27 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-07-20, 21:11:27 UTC] {base.py:68} INFO - Using connection ID 'snflk_conn_id' for task execution.
[2022-07-20, 21:11:27 UTC] {base.py:68} INFO - Using connection ID 'snflk_conn_id' for task execution.
[2022-07-20, 21:11:27 UTC] {connection.py:272} INFO - Snowflake Connector for Python Version: 2.7.1, Python Version: 3.7.13, Platform: Linux-5.10.104-linuxkit-x86_64-with-debian-11.3
[2022-07-20, 21:11:27 UTC] {connection.py:879} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2022-07-20, 21:11:27 UTC] {connection.py:896} INFO - Setting use_openssl_only mode to False
[2022-07-20, 21:11:28 UTC] {cursor.py:696} INFO - query: [SELECT CASE WHEN COUNT(*) > 2 THEN 0 --false ELSE 1 --true END FROM TABLE...]
[2022-07-20, 21:11:28 UTC] {cursor.py:720} INFO - query execution done
[2022-07-20, 21:11:28 UTC] {connection.py:509} INFO - closed
[2022-07-20, 21:11:28 UTC] {connection.py:512} INFO - No async queries seem to be running, deleting session
[2022-07-20, 21:11:28 UTC] {sql.py:531} INFO - Query returns 1, type '<class 'int'>'
[2022-07-20, 21:11:28 UTC] {skipmixin.py:140} INFO - Following branch {{ task_instance.xcom_pull(task_ids='task_a', dag_id='abc', key='return_value') }}
[2022-07-20, 21:11:28 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/sql.py", line 557, in execute
self.skip_all_except(context["ti"], follow_branch)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/skipmixin.py", line 146, in skip_all_except
branch_task_ids = set(branch_task_ids)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom_arg.py", line 92, in __iter__
raise TypeError(f"{self.__class__.__name__!r} object is not iterable")
TypeError: 'XComArg' object is not iterable
[2022-07-20, 21:11:28 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=abc, task_id=branch_sql, execution_date=20220720T211124, start_date=20220720T211126, end_date=20220720T211128
[2022-07-20, 21:11:28 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 1034 for task branch_sql ('XComArg' object is not iterable; 5827)
[2022-07-20, 21:11:28 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-20, 21:11:28 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
Run Code Online (Sandbox Code Playgroud)
预计BranchSQLOperator会返回task_id,但在您的情况下,您将返回一个类对象。这也是错误状态,{self.__class__.__name__!r} object is not iterable
在您的情况下,task_a 和 task_b 是类对象。"task_a"是"task_b"task_id
你应该更换
follow_task_ids_if_true = task_a,
follow_task_ids_if_false = task_b,
Run Code Online (Sandbox Code Playgroud)
和
follow_task_ids_if_true = "task_a",
follow_task_ids_if_false = "task_b",
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5765 次 |
| 最近记录: |