气流获取重试次数

Kyl*_*ine 3 airflow

在我的Airflow DAG中,我有一个任务需要知道它是第一次运行还是重试运行.如果是重试尝试,我需要在任务中调整逻辑.

我对如何存储任务的重试次数有一些想法,但我不确定它们中的任何一个是否合法,或者是否有更容易的内置方式来获取任务中的这些信息.

  • 我想知道我是否可以在每次任务运行时附加的dag中都有一个整数变量.然后,如果任务重新我可以检查变量的值,看它是否大于1,因此将是重试运行.但我不确定可变全局变量是否在Airflow中以这种方式工作,因为可以有多个工作人员执行不同的任务(虽然我不确定).

  • 把它写在XCOM变量中?

cwu*_*rtz 9

可以从任务实例获得重试次数,该任务实例可通过宏获得{{ task_instance }}.https://airflow.apache.org/code.html#default-variables

如果您正在使用python运算符,只需添加provide_context=True,到您的运算符kwargs,然后在可调用do中kwargs['task_instance'].try_number

否则你可以这样做:

t = BashOperator(
    task_id='try_number_test',
    bash_command='echo "{{ task_instance.try_number }}"',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

编辑:

清除任务实例时,会将max_retry编号设置为当前try_number +重试值.所以你可以这样做:

ti = # whatever method you do to get the task_instance object
is_first = ti.max_tries - ti.task.retries + 1 == ti.try_number
Run Code Online (Sandbox Code Playgroud)

当运行时,气流会将try_number增加1,所以我想你在从配置的重试值中减去max_tries时需要+ 1.但我没有测试,以确认


use*_*886 6

@cwurtz 的答案很正确。我能够像这样使用它:

    def _get_actual_try_number(self, context):
        '''
        Returns the real try_number that you also see in task details or logs.
        '''
        return context['task_instance'].try_number

    def _get_relative_try_number(self, context):
        '''
        When a task is cleared, the try_numbers continue to increment.
        This returns the try number relative to the last clearing.
        '''
        ti = context['task_instance']
        actual_try_number = self._get_actual_try_number(context)

        # When the task instance is cleared, it will set the max_retry
        # number to be the current try_number + retry value.
        # From https://stackoverflow.com/a/51757521
        relative_first_try = ti.max_tries - ti.task.retries + 1

        return actual_try_number - relative_first_try + 1
Run Code Online (Sandbox Code Playgroud)