相关疑难解决方法(0)

airflow中的execution_date:需要作为变量访问

我真的是这个论坛的新手.但是我一直在为我们的公司玩气流.对不起,这个问题听起来真的很蠢.

我正在使用一堆BashOperator编写一个管道.基本上,对于每个任务,我想简单地使用'curl'调用REST api

这是我的管道看起来像(非常简化的版本):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

如果你注意到我在做什么current_datetime= datetime_obj.now(tz=tz.tzlocal()) 而不是我想要的是'execution_date'

如何直接使用'execution_date'并将其分配给我的python文件中的变量?

我有这个访问args的一般问题.任何帮助将得到真诚的感谢.

谢谢

airflow

45
推荐指数
5
解决办法
5万
查看次数

气流中的Python脚本调度

嗨,大家好,

我需要使用airflow来安排我的python 文件(其中包含从sql和一些连接中提取数据).我已经成功地将气流安装到我的linux服务器中,我可以使用气流网络服务器.但即使在完成文档后,我也不清楚我需要在哪里编写脚本以进行调度以及该脚本如何在airflow webserver中可用,这样我才能看到状态

就配置而言,我知道dag文件夹在我的主目录中的位置以及示例dags所在的位置.

注意:请不要将此标记为重复与如何在Airflow中运行bash脚本文件,因为我需要运行位于不同位置的python文件.

请在Airflow网络服务器中找到以下配置:

在此输入图像描述

下面是AIRFLOW_HOME目录中dag文件夹的屏幕截图

在此输入图像描述

还可以在下面找到DAG创建屏幕截图和Missing DAG错误的屏幕截图

在此输入图像描述

在此输入图像描述

在我选择简单的 DAG之后,填充了丢失DAG的错误

在此输入图像描述

python scheduling reload apache-spark airflow

11
推荐指数
2
解决办法
2万
查看次数

How to pass parameter to PythonOperator in Airflow

I just started using Airflow, can anyone enlighten me how to pass a parameter into PythonOperator like below:

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs=None,
    #op_kwargs=(key1='value1', key2='value2'),
    dag=dag,
)

def SendEmail(**kwargs):
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_message(msg)
    s.quit()
Run Code Online (Sandbox Code Playgroud)

I would like to be able to pass some parameters into the t5_send_notification's callable which is SendEmail, ideally I want to attach the full …

python airflow

6
推荐指数
2
解决办法
4496
查看次数

函数内的气流 pythonOperator ds 变量

我知道之前有人问过这个问题,但没有一个答案回答它。我开始有点发疯了!我很困惑所以真的很感激一些帮助。

我有一个带有 python 运算符的 DAG,它运行 SQL 查询并输出到 .csv。第二个运算符仅返回 true 以生成 DAG。我似乎无法访问我的函数中的 ds 变量。我想这样做是为了传递给查询。

from airflow.models import Variable, DAG
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json


sql_path = Variable.get("sql_path")
date = Variable.get("ds")
first_date = Variable.get("ds")

print date

def get_redshift_data(ds,**kwargs):
    pg_hook = PostgresHook(postgres_conn_id='redshift')
    params = {'window_start_date':date,'window_end_date':first_date}
    with open(sql_path+"/native.sql") as f:
        sql_file = f.read() % (params)
    df2 = pg_hook.get_pandas_df(sql_file)
    df2.to_csv("test_1.csv", encoding = "utf-8")

def print_test(ds, **kwargs):
    return True

args = {
    'owner': 'Bob', …
Run Code Online (Sandbox Code Playgroud)

python jinja2 amazon-redshift airflow

0
推荐指数
1
解决办法
6509
查看次数