标签: apache-airflow

气流默认为on_failure_callback

在我的DAG文件中,我定义了一个on_failure_callback()函数,以便在发生故障时发布Slack.

如果我为DAG中的每个运算符指定它,它运行良好:on_failure_callback = on_failure_callback()

有没有办法自动化(例如通过default_args,或通过我的DAG对象)调度到我的所有运营商?

python operators airflow apache-airflow

14
推荐指数
1
解决办法
1万
查看次数

气流:日志文件不是本地的,不支持的远程日志位置

我无法从Airflow UI中看到附加到任务的日志: 在此输入图像描述

airflow.cfg文件中的日志相关设置是:

  • remote_base_log_folder =
  • base_log_folder = /home/my_projects/ksaprice_project/airflow/logs
  • worker_log_server_port = 8793
  • child_process_log_directory = /home/my_projects/ksaprice_project/airflow/logs/scheduler

虽然我正在设置remote_base_log_folter,但它正在尝试从中获取日志http://:8793/log/tutorial/print_date/2017-08-02T00:00:00- 我不明白这种行为.根据设置,工作人员应该将日志存储在,/home/my_projects/ksaprice_project/airflow/logs并且应该从相同位置而不是远程获取它们.

更新 task_instance表内容: 在此输入图像描述

airflow apache-airflow airflow-scheduler

13
推荐指数
2
解决办法
8201
查看次数

Airflow:将{{ds}}作为参数传递给PostgresOperator

我想使用执行日期作为我的sql文件的参数:

我试过了

dt = '{{ ds }}'

s3_to_redshift = PostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    params={'file': dt},
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

但它不起作用.

python scheduler airflow apache-airflow

12
推荐指数
1
解决办法
6533
查看次数

如何使用Airflow获取和处理mysql记录?

我需要

1. run a select query on MYSQL DB and fetch the records.              
2. Records are processed by python script.
Run Code Online (Sandbox Code Playgroud)

我不确定我应该怎么做.xcom是走这条路的吗?此外,MYSQLOperator只执行查询,不获取记录.我可以使用内置传输操作符吗?我如何在这里使用MYSQL钩子?

你可能想要使用一个PythonOperator,它使用钩子来获取数据,应用转换并将(现在得分的)行送回其他地方.

有人可以解释如何进行相同的操作.

请参阅 - http://markmail.org/message/x6nfeo6zhjfeakfe

def do_work():
    mysqlserver = MySqlHook(connection_id)
    sql = "SELECT * from table where col > 100 "
    row_count = mysqlserver.get_records(sql, schema='testdb')
    print row_count[0][0]

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb',
    python_callable=mysqlHook,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

这是正确的方法吗?另外我们如何使用xcoms存储以下MySqlOperator的记录?

t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
Run Code Online (Sandbox Code Playgroud)

mysql python-3.x airflow apache-airflow

12
推荐指数
2
解决办法
7279
查看次数

将气流调度程序作为守护进程运行的问题

我有一个运行气流1.8.0的EC2实例LocalExecutor.根据文档,我预计以下两个命令之一会以守护进程模式引发调度程序:

airflow scheduler --daemon --num_runs=20

要么

airflow scheduler --daemon=True --num_runs=5

但事实并非如此.第一个命令似乎会起作用,但它只返回以下输出,然后返回终端而不产生任何后台任务:

[2017-09-28 18:15:02,794] {__init__.py:57} INFO - Using executor LocalExecutor
[2017-09-28 18:15:03,064] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2017-09-28 18:15:03,203] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
Run Code Online (Sandbox Code Playgroud)

第二个命令产生错误:

airflow scheduler: error: argument -D/--daemon: ignored explicit argument 'True'
Run Code Online (Sandbox Code Playgroud)

这是奇怪的,因为根据文档 --daemon=True应该是airflow scheduler调用的有效参数.

深入挖掘了我的StackOverflow帖子,其中一个响应建议systemd根据此repo可用的代码执行处理气流调度程序作为后台进程.

我对脚本的轻微修改后的修改将发布为以下Gists.我在Ubuntu 16.04.3上使用vanilla m4.xlarge EC2实例:

从那里我打电话给:

sudo systemctl enable airflow-scheduler
sudo …
Run Code Online (Sandbox Code Playgroud)

python amazon-ec2 airflow ubuntu-16.04 apache-airflow

12
推荐指数
1
解决办法
1万
查看次数

气流:模式运行气流子标记一次

从气流文档:

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
Run Code Online (Sandbox Code Playgroud)

我知道subagoperator实际上是作为BackfillJob实现的,因此我们必须向schedule_interval运营商提供.但是,有没有办法获得子schedule_interval="@once"标记的语义等价物?我担心如果我使用set schedule_interval="@daily"为子标记,如果子标记运行时间超过一天,则子标记可能会运行多次.

def subdag_factory(parent_dag_name, child_dag_name, args):
    subdag = DAG(
        dag_id="{parent_dag_name}.{child_dag_name}".format(
            parent_dag_name=parent_dag_name, child_dag_name=child_dag_name
        ),
        schedule_interval="@daily", # <--- this bit here
        default_args=args
    )

    ... do more stuff to the subdag here
    return subdag
Run Code Online (Sandbox Code Playgroud)

TLDR:如何伪造"每次触发父dag只运行一次这个子标记"

python etl directed-acyclic-graphs airflow apache-airflow

11
推荐指数
2
解决办法
4245
查看次数

调试损坏的DAG

当气流网络服务器显示错误时Broken DAG: [<path/to/dag>] <error>,我们如何以及在何处找到这些异常的完整堆栈跟踪?

我试过这些地方:

/var/log/airflow/webserver- 在执行的时间范围内没有日志,其他日志是二进制的,并且解码时strings没有给出有用的信息.

/var/log/airflow/scheduler - 有一些日志,但是是二进制形式,试图阅读它们,看起来主要是sqlalchemy日志可能是气流的数据库.

/var/log/airflow/worker - 显示运行DAG的日志(与您在气流页面上看到的日志相同)

然后也在/var/log/airflow/rotated- 找不到我正在寻找的堆栈跟踪.

我使用的是airflow v1.7.1.3

airflow apache-airflow

11
推荐指数
2
解决办法
9259
查看次数

气流DAG运行已触发,但从未执行过?

我发现自己处于这样一种情况:我手动触发DAG Run(via airflow trigger_dag datablocks_dag)运行,Dag Run显示在界面中,但它会永远保持"Running"而不会实际执行任何操作.

当我在UI中检查此DAG Run时,我看到以下内容:

在此输入图像描述

我已经start_date准备好了datetime(2016, 1, 1),并schedule_interval设置为@once.从阅读文档中得到的理解是,从start_date现在起,DAG将被触发.在@once确保它只发生一次.

我的日志文件说:

[2017-07-11 21:32:05,359] {jobs.py:343} DagFileProcessor0 INFO - Started process (PID=21217) to work on /home/alex/Desktop/datablocks/tests/.airflow/dags/datablocks_dag.py
[2017-07-11 21:32:05,359] {jobs.py:534} DagFileProcessor0 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
[2017-07-11 21:32:05,365] {jobs.py:1525} DagFileProcessor0 INFO - Processing file /home/alex/Desktop/datablocks/tests/.airflow/dags/datablocks_dag.py for tasks to queue
[2017-07-11 21:32:05,365] {models.py:176} DagFileProcessor0 INFO - Filling up …
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow

11
推荐指数
1
解决办法
3187
查看次数

通过UI将参数传递给Airflow的作业

是否可以通过UI将参数传递给Airflow的作业?

DFA中的AFAIK,'params'参数在python代码中定义,因此无法在运行时更改.

orchestration airflow apache-airflow

10
推荐指数
2
解决办法
4037
查看次数

(Django)气流中的ORM - 有可能吗?

如何在Airflow任务中使用Django模型?

根据官方的Airflow文档,Airflow提供了与数据库(如MySqlHook/PostgresHook/etc)交互的钩子,这些钩子稍后可以在运算符中用于行查询执行.附加核心代码片段:

https://airflow.apache.org/_modules/mysql_hook.html复制

class MySqlHook(DbApiHook):
    conn_name_attr = 'mysql_conn_id'
    default_conn_name = 'mysql_default'
    supports_autocommit = True

    def get_conn(self):
        """
        Returns a mysql connection object
        """
        conn = self.get_connection(self.mysql_conn_id)
        conn_config = {
            "user": conn.login,
            "passwd": conn.password or ''
        }
        conn_config["host"] = conn.host or 'localhost'
        conn_config["db"] = conn.schema or ''
        conn = MySQLdb.connect(**conn_config)
        return conn
Run Code Online (Sandbox Code Playgroud)

https://airflow.apache.org/_modules/mysql_operator.html复制

class MySqlOperator(BaseOperator):
    @apply_defaults
    def __init__(
            self, sql, mysql_conn_id='mysql_default', parameters=None,
            autocommit=False, *args, **kwargs):
        super(MySqlOperator, self).__init__(*args, **kwargs)
        self.mysql_conn_id = mysql_conn_id
        self.sql = sql
        self.autocommit = autocommit …
Run Code Online (Sandbox Code Playgroud)

django orm django-models airflow apache-airflow

10
推荐指数
1
解决办法
2239
查看次数