可以从任务实例获得重试次数,该任务实例可通过宏获得{{ task_instance }}.https://airflow.apache.org/code.html#default-variables
如果您正在使用python运算符,只需添加provide_context=True,到您的运算符kwargs,然后在可调用do中kwargs['task_instance'].try_number
否则你可以这样做:
t = BashOperator(
task_id='try_number_test',
bash_command='echo "{{ task_instance.try_number }}"',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
编辑:
清除任务实例时,会将max_retry编号设置为当前try_number +重试值.所以你可以这样做:
ti = # whatever method you do to get the task_instance object
is_first = ti.max_tries - ti.task.retries + 1 == ti.try_number
Run Code Online (Sandbox Code Playgroud)
当运行时,气流会将try_number增加1,所以我想你在从配置的重试值中减去max_tries时需要+ 1.但我没有测试,以确认
@cwurtz 的答案很正确。我能够像这样使用它:
def _get_actual_try_number(self, context):
'''
Returns the real try_number that you also see in task details or logs.
'''
return context['task_instance'].try_number
def _get_relative_try_number(self, context):
'''
When a task is cleared, the try_numbers continue to increment.
This returns the try number relative to the last clearing.
'''
ti = context['task_instance']
actual_try_number = self._get_actual_try_number(context)
# When the task instance is cleared, it will set the max_retry
# number to be the current try_number + retry value.
# From https://stackoverflow.com/a/51757521
relative_first_try = ti.max_tries - ti.task.retries + 1
return actual_try_number - relative_first_try + 1
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2141 次 |
| 最近记录: |