我真的是这个论坛的新手.但是我一直在为我们的公司玩气流.对不起,这个问题听起来真的很蠢.
我正在使用一堆BashOperator编写一个管道.基本上,对于每个任务,我想简单地使用'curl'调用REST api
这是我的管道看起来像(非常简化的版本):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['xxxx@xxx.xxx'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
如果你注意到我在做什么current_datetime= datetime_obj.now(tz=tz.tzlocal())
而不是我想要的是'execution_date'
如何直接使用'execution_date'并将其分配给我的python文件中的变量?
我有这个访问args的一般问题.任何帮助将得到真诚的感谢.
谢谢
嗨,大家好,
我需要使用airflow来安排我的python 文件(其中包含从sql和一些连接中提取数据).我已经成功地将气流安装到我的linux服务器中,我可以使用气流网络服务器.但即使在完成文档后,我也不清楚我需要在哪里编写脚本以进行调度以及该脚本如何在airflow webserver中可用,这样我才能看到状态
就配置而言,我知道dag文件夹在我的主目录中的位置以及示例dags所在的位置.
注意:请不要将此标记为重复与如何在Airflow中运行bash脚本文件,因为我需要运行位于不同位置的python文件.
I just started using Airflow, can anyone enlighten me how to pass a parameter into PythonOperator like below:
t5_send_notification = PythonOperator(
task_id='t5_send_notification',
provide_context=True,
python_callable=SendEmail,
op_kwargs=None,
#op_kwargs=(key1='value1', key2='value2'),
dag=dag,
)
def SendEmail(**kwargs):
msg = MIMEText("The pipeline for client1 is completed, please check.")
msg['Subject'] = "xxxx"
msg['From'] = "xxxx"
......
s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()
Run Code Online (Sandbox Code Playgroud)
I would like to be able to pass some parameters into the t5_send_notification's callable which is SendEmail, ideally I want to attach the full …
我知道之前有人问过这个问题,但没有一个答案回答它。我开始有点发疯了!我很困惑所以真的很感激一些帮助。
我有一个带有 python 运算符的 DAG,它运行 SQL 查询并输出到 .csv。第二个运算符仅返回 true 以生成 DAG。我似乎无法访问我的函数中的 ds 变量。我想这样做是为了传递给查询。
from airflow.models import Variable, DAG
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json
sql_path = Variable.get("sql_path")
date = Variable.get("ds")
first_date = Variable.get("ds")
print date
def get_redshift_data(ds,**kwargs):
pg_hook = PostgresHook(postgres_conn_id='redshift')
params = {'window_start_date':date,'window_end_date':first_date}
with open(sql_path+"/native.sql") as f:
sql_file = f.read() % (params)
df2 = pg_hook.get_pandas_df(sql_file)
df2.to_csv("test_1.csv", encoding = "utf-8")
def print_test(ds, **kwargs):
return True
args = {
'owner': 'Bob', …Run Code Online (Sandbox Code Playgroud)