我真的是这个论坛的新手.但是我一直在为我们的公司玩气流.对不起,这个问题听起来真的很蠢.
我正在使用一堆BashOperator编写一个管道.基本上,对于每个任务,我想简单地使用'curl'调用REST api
这是我的管道看起来像(非常简化的版本):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['xxxx@xxx.xxx'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
如果你注意到我在做什么current_datetime= datetime_obj.now(tz=tz.tzlocal())
而不是我想要的是'execution_date'
如何直接使用'execution_date'并将其分配给我的python文件中的变量?
我有这个访问args的一般问题.任何帮助将得到真诚的感谢.
谢谢
小智 36
的BashOperator
的bash_command
说法是一个模板.您可以使用变量execution_date
在任何模板中作为datetime
对象访问execution_date
.在模板中,您可以使用任何jinja2
方法来操作它.
使用以下作为您的BashOperator
bash_command
字符串:
# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}
# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
Run Code Online (Sandbox Code Playgroud)
如果您只想要相当于执行日期的字符串,ds
则返回日期戳(YYYY-MM-DD),ds_nodash
返回相同而不带破折号(YYYYMMDD)等.Api Docs中macros
提供了更多信息.
您的最终运营商将如下所示:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)
Run Code Online (Sandbox Code Playgroud)
Zig*_*ien 27
PythonOperator构造函数采用'provide_context'参数(请参阅https://pythonhosted.org/airflow/code.html).如果它是True,那么它通过kwargs将许多参数传递给python_callable.我相信kwargs ['execution_date']就是你想要的.
像这样的东西:
def python_method(ds, **kwargs):
Variable.set('execution_date', kwargs['execution_date'])
return
doit = PythonOperator(
task_id='doit',
provide_context=True,
python_callable=python_method,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
我不知道如何使用BashOperator,但你可能会从这个问题开始:https://github.com/airbnb/airflow/issues/775
Bab*_*ool 14
我认为您不能使用来自任务实例之外的气流上下文的值来分配变量,它们仅在运行时可用.当dag在气流中加载并执行时,基本上有两个不同的步骤:
首先解释和解析你的dag文件.它必须工作和编译,任务定义必须正确(没有语法错误或任何东西).在此步骤中,如果您进行函数调用以填充某些值,则这些函数将无法访问气流上下文(例如,执行日期,如果您正在进行某些回填,则更多).
第二步是执行dag.只有在第二步中,airflow(execution_date, ds, etc...
)提供的变量才可用,因为它们与执行dag有关.
因此,您无法使用Airflow上下文初始化全局变量,但是,Airflow为您提供了多种机制来实现相同的效果:
在命令中使用jinja模板(它可以在代码或文件中的字符串中,两者都将被处理).您可以在此处获取可用模板列表:https://airflow.apache.org/code.html#default-variables.请注意,某些功能也可用,特别是对于计算天数增量和日期格式.
使用PythonOperator,在其中传递上下文(带provide_context
参数).这将允许您使用语法访问相同的模板kwargs['<variable_name']
.如果需要,可以从PythonOperator返回一个值,这个值将存储在一个XCOM变量中,以后可以在任何模板中使用.访问XCOM变量使用以下语法:https://airflow.apache.org/concepts.html#xcoms
如果您编写自己的运算符,则可以使用dict访问气流变量context
.
l0n*_*g3r 10
def execute(self, context):
execution_date = context.get("execution_date")
Run Code Online (Sandbox Code Playgroud)
这应该在Operator的execute()方法内
小智 5
要打印执行日期您的可调用的函数内部PythonOperator
,你可以使用下面你气流脚本,也可以添加start_time
和end_time
如下:
def python_func(**kwargs):
ts = kwargs["execution_date"]
end_time = str(ts)
start_time = str(ts.add(minutes=-30))
Run Code Online (Sandbox Code Playgroud)
我已将日期时间值转换为字符串,因为我需要在 SQL 查询中传递它。我们也可以以其他方式使用它。
归档时间: |
|
查看次数: |
45945 次 |
最近记录: |