将其他参数传递给on_failure_callback

Geo*_*oor 3 airflow

我想将其他参数传递给我的on_failure_callback函数,但它似乎只需要“上下文”。我如何将其他参数传递给该函数……尤其是因为我想在一个单独的模块中定义该函数,以便可以在我的所有DAGS中使用它。

我当前的default_args看起来像这样:

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['me@me.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': notify_failure,
  'max_active_runs': 1
}
Run Code Online (Sandbox Code Playgroud)

如果我尝试类似这种气流抱怨:

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['me@me.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': notify_failure(context,arg1,arg2),
  'max_active_runs': 1
}
Run Code Online (Sandbox Code Playgroud)

所以不确定如何将arg1和arg2传递给我想在一个单独模块中定义的notify_failure函数,我可以将其简单地导入到DAG中

cwu*_*rtz 8

假设您可以在DAG级别定义args,则可以使用partials包。即:

from functools import partial

def generic_failure(arg1, arg2, context):
  # do whatever

default_args = {
  'owner': 'Me',
  'depends_on_past': True,
  'start_date': datetime(2016,01,01),
  'email': ['me@me.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=1),
  'on_failure_callback': partial(generic_failure, arg1, arg2),
  'max_active_runs': 1
}
Run Code Online (Sandbox Code Playgroud)

调用partial(generic_failure, arg1, arg2)将返回一个函数,该函数期望保留许多参数generic_failure,但在上面的示例中,这只是单个参数context

  • 这个答案应该被接受,它优雅且有效! (2认同)

Vin*_*aes 5

您可以为此使用嵌套函数

def generic_failure(arg1, arg2):
    def failure(context):
        message = 'we have a function that failed witg args : {ARG1}, {ARG2}'.format(ARG1=arg1,ARG2=arg2)
        print(message)
        return message
    return failure

arg1 = 'arg1'
arg2 = 'arg2'

default_args = {
  'owner': 'Me',
  'on_failure_callback': generic_failure(arg1, arg2),
}
Run Code Online (Sandbox Code Playgroud)