Airflow:BranchSQLOperator:TypeError:“XComArg”对象不可迭代

Kar*_*Kar 3 airflow airflow-2.x

需求:根据SQL的返回值分支下一个任务

我尝试使用branchsqloperator实现

错误:但出现 TypeError: 'XComArg' object is not iterable 错误

下面是SQL查询

SELECT
    CASE
        WHEN COUNT(*) > 2 THEN 0 --false
        ELSE 1 --true
    END
FROM
    TABLE_A;
Run Code Online (Sandbox Code Playgroud)

达格:

 @task(task_id="task_a")
        def task_a():
            return "job_success"

        @task(task_id="task_b")
        def task_b():
            return "job_failure"

        task_a = task_a()
        task_b = task_b()
        branch_sql = BranchSQLOperator(
                    task_id='branch_sql', 
                    sql='query.sql',
                    follow_task_ids_if_true = task_a,
                    follow_task_ids_if_false = task_b,
                    conn_id = 'default_conn_id',
                )

        branch_sql >> [task_a,task_b]
Run Code Online (Sandbox Code Playgroud)

[![在此处输入图像描述][1]][1]

以下是日志:

[2022-07-20, 21:11:27 UTC] {crypto.py:82} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-07-20, 21:11:27 UTC] {base.py:68} INFO - Using connection ID 'snflk_conn_id' for task execution.
[2022-07-20, 21:11:27 UTC] {base.py:68} INFO - Using connection ID 'snflk_conn_id' for task execution.
[2022-07-20, 21:11:27 UTC] {connection.py:272} INFO - Snowflake Connector for Python Version: 2.7.1, Python Version: 3.7.13, Platform: Linux-5.10.104-linuxkit-x86_64-with-debian-11.3
[2022-07-20, 21:11:27 UTC] {connection.py:879} INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.
[2022-07-20, 21:11:27 UTC] {connection.py:896} INFO - Setting use_openssl_only mode to False
[2022-07-20, 21:11:28 UTC] {cursor.py:696} INFO - query: [SELECT CASE WHEN COUNT(*) > 2 THEN 0 --false ELSE 1 --true END FROM TABLE...]
[2022-07-20, 21:11:28 UTC] {cursor.py:720} INFO - query execution done
[2022-07-20, 21:11:28 UTC] {connection.py:509} INFO - closed
[2022-07-20, 21:11:28 UTC] {connection.py:512} INFO - No async queries seem to be running, deleting session
[2022-07-20, 21:11:28 UTC] {sql.py:531} INFO - Query returns 1, type '<class 'int'>'
[2022-07-20, 21:11:28 UTC] {skipmixin.py:140} INFO - Following branch {{ task_instance.xcom_pull(task_ids='task_a', dag_id='abc', key='return_value') }}
[2022-07-20, 21:11:28 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/sql.py", line 557, in execute
    self.skip_all_except(context["ti"], follow_branch)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/skipmixin.py", line 146, in skip_all_except
    branch_task_ids = set(branch_task_ids)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom_arg.py", line 92, in __iter__
    raise TypeError(f"{self.__class__.__name__!r} object is not iterable")
TypeError: 'XComArg' object is not iterable
[2022-07-20, 21:11:28 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=abc, task_id=branch_sql, execution_date=20220720T211124, start_date=20220720T211126, end_date=20220720T211128
[2022-07-20, 21:11:28 UTC] {standard_task_runner.py:97} ERROR - Failed to execute job 1034 for task branch_sql ('XComArg' object is not iterable; 5827)
[2022-07-20, 21:11:28 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-07-20, 21:11:28 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
Run Code Online (Sandbox Code Playgroud)

Ela*_*lad 5

预计BranchSQLOperator会返回task_id,但在您的情况下,您将返回一个类对象。这也是错误状态,{self.__class__.__name__!r} object is not iterable 在您的情况下,task_a 和 task_b 是类对象。"task_a""task_b"task_id

你应该更换

follow_task_ids_if_true = task_a,
follow_task_ids_if_false = task_b,
Run Code Online (Sandbox Code Playgroud)

follow_task_ids_if_true = "task_a",
follow_task_ids_if_false = "task_b",
Run Code Online (Sandbox Code Playgroud)