Aet*_*tos 6 airflow google-cloud-composer
我对BaseSensorOperator参数的工作方式有点困惑:timeout & poke_interval。考虑传感器的这种用法:
BaseSensorOperator(
soft_fail=True,
poke_interval = 4*60*60, # Poke every 4 hours
timeout = 12*60*60, # Timeout after 12 hours
)
Run Code Online (Sandbox Code Playgroud)
文档提到超时行为在任务用完后将任务设置为“失败”。但是我使用的是soft_fail=True,我认为它不会保留相同的行为,因为我发现任务失败而不是在我同时使用参数soft_fail和timeout.
那么这里发生了什么?
这是 BaseSensorOperator 的文档
class BaseSensorOperator(BaseOperator, SkipMixin):
"""
Sensor operators are derived from this class and inherit these attributes.
Sensor operators keep executing at a time interval and succeed when
a criteria is met and fail if and when they time out.
:param soft_fail: Set to true to mark the task as SKIPPED on failure
:type soft_fail: bool
:param poke_interval: Time in seconds that the job should wait in
between each tries
:type poke_interval: int
:param timeout: Time, in seconds before the task times out and fails.
:type timeout: int
:param mode: How the sensor operates.
Options are: ``{ poke | reschedule }``, default is ``poke``.
When set to ``poke`` the sensor is taking up a worker slot for its
whole execution time and sleeps between pokes. Use this mode if the
expected runtime of the sensor is short or if a short poke interval
is requried.
When set to ``reschedule`` the sensor task frees the worker slot when
the criteria is not yet met and it's rescheduled at a later time. Use
this mode if the expected time until the criteria is met is. The poke
inteval should be more than one minute to prevent too much load on
the scheduler.
:type mode: str
"""
Run Code Online (Sandbox Code Playgroud)
y2k*_*ham 10
定义条款
poke_interval: b/w 连续“戳”的持续时间(评估被“感知”的必要条件)
timeout:只是无限期地戳是不可接受的(例如,如果您的错误代码在当月为 2 时每天戳到 29,它将继续戳长达 4 年)。因此,我们定义了一个最大时间段,超过该时间段我们将停止戳并终止(传感器标记为FAILED或SKIPPED)
soft_fail:通常(当soft_fail=False)时,传感器被标记为FAILED超时后。当soft_fail=True,传感器将被标记为SKIPPED超时后
mode: 这个有点复杂
slot某个池(default池或明确指定pool)中的一个;本质上意味着它占用了一些资源。mode在传感器中引入了s
mode='poke' (默认)表示我们上面讨论的现有行为mode='reschedule'意味着在poke 尝试之后,而不是进入 sleep,传感器将表现得好像它失败了(在当前尝试中)并且它的状态将从RUNNING变为UP_FOR_RETRY。这样,它将释放它的插槽,允许其他任务在等待另一次poke 尝试时进行Run Code Online (Sandbox Code Playgroud)if self.reschedule: reschedule_date = timezone.utcnow() + timedelta( seconds=self._get_next_poke_interval(started_at, try_number)) raise AirflowRescheduleException(reschedule_date) else: sleep(self._get_next_poke_interval(started_at, try_number)) try_number += 1
现在直接回答你的问题
第一季度
- 传感器每4小时戳一次,每戳一次,是否会等待超时时间(12小时)?
- 还是每 4 小时戳一次,总共戳 3 次,然后超时?
第2点是正确的
Q2
另外,如果我使用 mode="reschedule",这些参数会发生什么?
如前所述,这些参数中的每一个都是独立的,并且设置mode='reschedule'不会以任何方式改变它们的行为
BaseSensorOperator(
soft_fail=True,
poke_interval = 4*60*60, # Poke every 4 hours
timeout = 12*60*60, # Timeout of 12 hours
mode = "reschedule"
)
Run Code Online (Sandbox Code Playgroud)
假设第一次戳时不满足标准。所以它会在间隔4小时后再次运行。但工作槽将在等待期间被释放,因为我们正在使用mode="reschedule".
这就是我的理解。
| 归档时间: |
|
| 查看次数: |
2882 次 |
| 最近记录: |