对 Airflow 的 BaseSensorOperator 参数感到困惑:超时、poke_interval 和模式

Aet*_*tos 6 airflow google-cloud-composer

我对BaseSensorOperator参数的工作方式有点困惑:timeout & poke_interval。考虑传感器的这种用法:

BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout after 12 hours
)
Run Code Online (Sandbox Code Playgroud)

文档提到超时行为在任务用完后将任务设置为“失败”。但是我使用的是soft_fail=True,我认为它不会保留相同的行为,因为我发现任务失败而不是在我同时使用参数soft_failtimeout.

那么这里发生了什么?

  1. 传感器每4小时戳一次,每戳一次,是否会等待超时时间(12小时)?
  2. 还是每 4 小时戳一次,总共戳 3 次,然后超时?
  3. 另外,如果我使用 mode="reschedule",这些参数会发生什么?

这是 BaseSensorOperator 的文档

class BaseSensorOperator(BaseOperator, SkipMixin):
    """
    Sensor operators are derived from this class and inherit these attributes.
    Sensor operators keep executing at a time interval and succeed when
    a criteria is met and fail if and when they time out.
    :param soft_fail: Set to true to mark the task as SKIPPED on failure
    :type soft_fail: bool
    :param poke_interval: Time in seconds that the job should wait in
        between each tries
    :type poke_interval: int
    :param timeout: Time, in seconds before the task times out and fails.
    :type timeout: int
    :param mode: How the sensor operates.
        Options are: ``{ poke | reschedule }``, default is ``poke``.
        When set to ``poke`` the sensor is taking up a worker slot for its
        whole execution time and sleeps between pokes. Use this mode if the
        expected runtime of the sensor is short or if a short poke interval
        is requried.
        When set to ``reschedule`` the sensor task frees the worker slot when
        the criteria is not yet met and it's rescheduled at a later time. Use
        this mode if the expected time until the criteria is met is. The poke
        inteval should be more than one minute to prevent too much load on
        the scheduler.
    :type mode: str
    """
Run Code Online (Sandbox Code Playgroud)

y2k*_*ham 10

定义条款

  1. poke_interval: b/w 连续“戳”的持续时间(评估被“感知”的必要条件)

  2. timeout:只是无限期地是不可接受的(例如,如果您的错误代码在当月为 2 时每天戳到 29,它将继续戳长达 4 年)。因此,我们定义了一个最大时间段,超过该时间段我们将停止并终止(传感器标记为FAILEDSKIPPED

  3. soft_fail:通常(当soft_fail=False)时,传感器被标记为FAILED超时后。当soft_fail=True,传感器将被标记为SKIPPED超时后

  4. mode: 这个有点复杂

    • 任何任务(包括传感器)在运行时,都会占用slot某个池(default池或明确指定pool)中的一个;本质上意味着它占用了一些资源。
    • 对于传感器,这是
      • 浪费:因为即使我们只是在等待(没有做实际工作),一个插槽也被消耗了
      • 危险:如果您的工作流程有太多传感器同时进行感应,它们可能会冻结大量资源一段时间。事实上太多具有ExternalTaskSensors是臭名昭著的投入整个工作流程(DAG)的进入死锁
    • 为了克服这个问题,Airflow v1.10.2 mode在传感器中引入了s
      • mode='poke' (默认)表示我们上面讨论的现有行为
      • mode='reschedule'意味着在poke 尝试之后,而不是进入 sleep,传感器将表现得好像它失败了(在当前尝试中)并且它的状态将从RUNNING变为UP_FOR_RETRY。这样,它将释放它的插槽,允许其他任务在等待另一次poke 尝试时进行
    • 在此处引用代码中的相关片段
    if self.reschedule:
        reschedule_date = timezone.utcnow() + timedelta(
            seconds=self._get_next_poke_interval(started_at, try_number))
        raise AirflowRescheduleException(reschedule_date)
    else:
        sleep(self._get_next_poke_interval(started_at, try_number))
        try_number += 1
    
    Run Code Online (Sandbox Code Playgroud)

现在直接回答你的问题

第一季度

  1. 传感器每4小时戳一次,每戳一次,是否会等待超时时间(12小时)?
  2. 还是每 4 小时戳一次,总共戳 3 次,然后超时?

第2点是正确的

Q2

另外,如果我使用 mode="reschedule",这些参数会发生什么?

如前所述,这些参数中的每一个都是独立的,并且设置mode='reschedule'不会以任何方式改变它们的行为

  • 我会把它给你**@Aetos**。我认为在早期,他们可能没有想到传感器会被如此广泛地使用;多年来,Airflow 的用户和用例取得了很大进步。即使是这样; 当 `poke_interval` 很小时(比如大约 1 分钟),`mode='poke'` 仍然呈现出强有力的情况(并且实际上最小化了重复调度开销);[Astronomer.io 大家](https://www.astronomer.io/guides/what-is-a-sensor/) 说这个“”..“poke”模式:如果传感器的预期运行时间是,请使用此模式短或如果需要短的刺戳间隔.."` (2认同)

San*_*ram 5

BaseSensorOperator(
  soft_fail=True,
  poke_interval = 4*60*60,  # Poke every 4 hours
  timeout = 12*60*60,  # Timeout of 12 hours
  mode = "reschedule"
)
Run Code Online (Sandbox Code Playgroud)

假设第一次戳时不满足标准。所以它会在间隔4小时后再次运行。但工作槽将在等待期间被释放,因为我们正在使用mode="reschedule".

这就是我的理解。