airflow中的execution_date:需要作为变量访问

Rog*_*ger 45 airflow

我真的是这个论坛的新手.但是我一直在为我们的公司玩气流.对不起,这个问题听起来真的很蠢.

我正在使用一堆BashOperator编写一个管道.基本上,对于每个任务,我想简单地使用'curl'调用REST api

这是我的管道看起来像(非常简化的版本):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

如果你注意到我在做什么current_datetime= datetime_obj.now(tz=tz.tzlocal()) 而不是我想要的是'execution_date'

如何直接使用'execution_date'并将其分配给我的python文件中的变量?

我有这个访问args的一般问题.任何帮助将得到真诚的感谢.

谢谢

小智 36

BashOperatorbash_command 说法是一个模板.您可以使用变量execution_date在任何模板中作为datetime 对象访问execution_date.在模板中,您可以使用任何jinja2方法来操作它.

使用以下作为您的BashOperator bash_command 字符串:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
Run Code Online (Sandbox Code Playgroud)

如果您只想要相当于执行日期的字符串,ds则返回日期戳(YYYY-MM-DD),ds_nodash返回相同而不带破折号(YYYYMMDD)等.Api Docs中macros提供了更多信息.


您的最终运营商将如下所示:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
Run Code Online (Sandbox Code Playgroud)

  • 这是正确的答案.我只是编辑它来显示任务的完整版本,例如`t1 = BashOperator(task_id ='rest-api-1',bash_command ='curl -XPOST"'+ hostname +':8000/run?st = {{ execution_date}}"',dag = dag)` (2认同)
  • 想要用 `Python3` fstrings 信息更新它,`command =f """..."""` 似乎不起作用。为了让 `jinja2` 模​​板工作,我认为你不能使用 **`fstrings`** (2认同)

Zig*_*ien 27

PythonOperator构造函数采用'provide_context'参数(请参阅https://pythonhosted.org/airflow/code.html).如果它是True,那么它通过kwargs将许多参数传递给python_callable.我相信kwargs ['execution_date']就是你想要的.

像这样的东西:

def python_method(ds, **kwargs):
    Variable.set('execution_date', kwargs['execution_date'])
    return

doit = PythonOperator(
    task_id='doit',
    provide_context=True,
    python_callable=python_method,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

我不知道如何使用BashOperator,但你可能会从这个问题开始:https://github.com/airbnb/airflow/issues/775

  • 谢谢.使用这种方法,我将有一个任务t1,它将是一个带有provide_context = true的PythonOperator实例,它允许我使用kwargs ['execution_date']我将设置并返回current_datetime ='execution_date'.然后我创建我的任务t2:BashOperator:我将拉(使用XCOM)并使用我的变量.所以你看,我必须创建2个任务.这有点不性感;)我确信(我希望我是对的)有一种方法可以直接在python代码中访问'execution_date',而无需使用PythonOperator.但我无法弄清楚如何做到这一点:( (3认同)

Bab*_*ool 14

我认为您不能使用来自任务实例之外的气流上下文的值来分配变量,它们仅在运行时可用.当dag在气流中加载并执行时,基本上有两个不同的步骤:

  • 首先解释和解析你的dag文件.它必须工作和编译,任务定义必须正确(没有语法错误或任何东西).在此步骤中,如果您进行函数调用以填充某些值,则这些函数将无法访问气流上下文(例如,执行日期,如果您正在进行某些回填,则更多).

  • 第二步是执行dag.只有在第二步中,airflow(execution_date, ds, etc...)提供的变量才可用,因为它们与执行dag有关.

因此,您无法使用Airflow上下文初始化全局变量,但是,Airflow为您提供了多种机制来实现相同的效果:

  1. 在命令中使用jinja模板(它可以在代码或文件中的字符串中,两者都将被处理).您可以在此处获取可用模板列表:https://airflow.apache.org/code.html#default-variables.请注意,某些功能也可用,特别是对于计算天数增量和日期格式.

  2. 使用PythonOperator,在其中传递上下文(带provide_context参数).这将允许您使用语法访问相同的模板kwargs['<variable_name'].如果需要,可以从PythonOperator返回一个值,这个值将存储在一个XCOM变量中,以后可以在任何模板中使用.访问XCOM变量使用以下语法:https://airflow.apache.org/concepts.html#xcoms

  3. 如果您编写自己的运算符,则可以使用dict访问气流变量context.

  • 正如上面其他问题中所指出的,技术上有 3 种方法可以做到这一点。使用 jinja 模板,在 python_callable 中使用 kwargs,或在操作符中使用 context['execution_date']。可能最好完全删除此答案,或者至少删除大部分内容。 (3认同)

l0n*_*g3r 10

def execute(self, context):
    execution_date = context.get("execution_date")
Run Code Online (Sandbox Code Playgroud)

这应该在Operator的execute()方法内

  • 如果您正在构建自定义运算符,这可能是您想要的。 (3认同)

小智 5

要打印执行日期您的可调用的函数内部PythonOperator,你可以使用下面你气流脚本,也可以添加start_timeend_time如下:

def python_func(**kwargs):
    ts = kwargs["execution_date"]
    end_time = str(ts)
    start_time = str(ts.add(minutes=-30))
Run Code Online (Sandbox Code Playgroud)

我已将日期时间值转换为字符串,因为我需要在 SQL 查询中传递它。我们也可以以其他方式使用它。