在我的DAG文件中,我定义了一个on_failure_callback()函数,以便在发生故障时发布Slack.
如果我为DAG中的每个运算符指定它,它运行良好:on_failure_callback = on_failure_callback()
有没有办法自动化(例如通过default_args,或通过我的DAG对象)调度到我的所有运营商?
airflow.cfg文件中的日志相关设置是:
remote_base_log_folder =base_log_folder = /home/my_projects/ksaprice_project/airflow/logsworker_log_server_port = 8793 child_process_log_directory =
/home/my_projects/ksaprice_project/airflow/logs/scheduler虽然我正在设置remote_base_log_folter,但它正在尝试从中获取日志http://:8793/log/tutorial/print_date/2017-08-02T00:00:00- 我不明白这种行为.根据设置,工作人员应该将日志存储在,/home/my_projects/ksaprice_project/airflow/logs并且应该从相同位置而不是远程获取它们.
我想使用执行日期作为我的sql文件的参数:
我试过了
dt = '{{ ds }}'
s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
params={'file': dt},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
但它不起作用.
我需要
1. run a select query on MYSQL DB and fetch the records.
2. Records are processed by python script.
Run Code Online (Sandbox Code Playgroud)
我不确定我应该怎么做.xcom是走这条路的吗?此外,MYSQLOperator只执行查询,不获取记录.我可以使用内置传输操作符吗?我如何在这里使用MYSQL钩子?
你可能想要使用一个PythonOperator,它使用钩子来获取数据,应用转换并将(现在得分的)行送回其他地方.
有人可以解释如何进行相同的操作.
请参阅 - http://markmail.org/message/x6nfeo6zhjfeakfe
def do_work():
mysqlserver = MySqlHook(connection_id)
sql = "SELECT * from table where col > 100 "
row_count = mysqlserver.get_records(sql, schema='testdb')
print row_count[0][0]
callMYSQLHook = PythonOperator(
task_id='fetch_from_testdb',
python_callable=mysqlHook,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
这是正确的方法吗?另外我们如何使用xcoms存储以下MySqlOperator的记录?
t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)
Run Code Online (Sandbox Code Playgroud) 我有一个运行气流1.8.0的EC2实例LocalExecutor.根据文档,我预计以下两个命令之一会以守护进程模式引发调度程序:
airflow scheduler --daemon --num_runs=20
要么
airflow scheduler --daemon=True --num_runs=5
但事实并非如此.第一个命令似乎会起作用,但它只返回以下输出,然后返回终端而不产生任何后台任务:
[2017-09-28 18:15:02,794] {__init__.py:57} INFO - Using executor LocalExecutor
[2017-09-28 18:15:03,064] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2017-09-28 18:15:03,203] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
Run Code Online (Sandbox Code Playgroud)
第二个命令产生错误:
airflow scheduler: error: argument -D/--daemon: ignored explicit argument 'True'
Run Code Online (Sandbox Code Playgroud)
这是奇怪的,因为根据文档 --daemon=True应该是airflow scheduler调用的有效参数.
深入挖掘了我的StackOverflow帖子,其中一个响应建议systemd根据此repo可用的代码执行处理气流调度程序作为后台进程.
我对脚本的轻微修改后的修改将发布为以下Gists.我在Ubuntu 16.04.3上使用vanilla m4.xlarge EC2实例:
从那里我打电话给:
sudo systemctl enable airflow-scheduler
sudo …Run Code Online (Sandbox Code Playgroud) 从气流文档:
SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything
Run Code Online (Sandbox Code Playgroud)
我知道subagoperator实际上是作为BackfillJob实现的,因此我们必须向schedule_interval运营商提供.但是,有没有办法获得子schedule_interval="@once"标记的语义等价物?我担心如果我使用set schedule_interval="@daily"为子标记,如果子标记运行时间超过一天,则子标记可能会运行多次.
def subdag_factory(parent_dag_name, child_dag_name, args):
subdag = DAG(
dag_id="{parent_dag_name}.{child_dag_name}".format(
parent_dag_name=parent_dag_name, child_dag_name=child_dag_name
),
schedule_interval="@daily", # <--- this bit here
default_args=args
)
... do more stuff to the subdag here
return subdag
Run Code Online (Sandbox Code Playgroud)
TLDR:如何伪造"每次触发父dag只运行一次这个子标记"
当气流网络服务器显示错误时Broken DAG: [<path/to/dag>] <error>,我们如何以及在何处找到这些异常的完整堆栈跟踪?
我试过这些地方:
/var/log/airflow/webserver- 在执行的时间范围内没有日志,其他日志是二进制的,并且解码时strings没有给出有用的信息.
/var/log/airflow/scheduler - 有一些日志,但是是二进制形式,试图阅读它们,看起来主要是sqlalchemy日志可能是气流的数据库.
/var/log/airflow/worker - 显示运行DAG的日志(与您在气流页面上看到的日志相同)
然后也在/var/log/airflow/rotated- 找不到我正在寻找的堆栈跟踪.
我使用的是airflow v1.7.1.3
我发现自己处于这样一种情况:我手动触发DAG Run(via airflow trigger_dag datablocks_dag)运行,Dag Run显示在界面中,但它会永远保持"Running"而不会实际执行任何操作.
当我在UI中检查此DAG Run时,我看到以下内容:
我已经start_date准备好了datetime(2016, 1, 1),并schedule_interval设置为@once.我从阅读文档中得到的理解是,从start_date现在起,DAG将被触发.在@once确保它只发生一次.
我的日志文件说:
[2017-07-11 21:32:05,359] {jobs.py:343} DagFileProcessor0 INFO - Started process (PID=21217) to work on /home/alex/Desktop/datablocks/tests/.airflow/dags/datablocks_dag.py
[2017-07-11 21:32:05,359] {jobs.py:534} DagFileProcessor0 ERROR - Cannot use more than 1 thread when using sqlite. Setting max_threads to 1
[2017-07-11 21:32:05,365] {jobs.py:1525} DagFileProcessor0 INFO - Processing file /home/alex/Desktop/datablocks/tests/.airflow/dags/datablocks_dag.py for tasks to queue
[2017-07-11 21:32:05,365] {models.py:176} DagFileProcessor0 INFO - Filling up …Run Code Online (Sandbox Code Playgroud) 是否可以通过UI将参数传递给Airflow的作业?
DFA中的AFAIK,'params'参数在python代码中定义,因此无法在运行时更改.
如何在Airflow任务中使用Django模型?
根据官方的Airflow文档,Airflow提供了与数据库(如MySqlHook/PostgresHook/etc)交互的钩子,这些钩子稍后可以在运算符中用于行查询执行.附加核心代码片段:
class MySqlHook(DbApiHook):
conn_name_attr = 'mysql_conn_id'
default_conn_name = 'mysql_default'
supports_autocommit = True
def get_conn(self):
"""
Returns a mysql connection object
"""
conn = self.get_connection(self.mysql_conn_id)
conn_config = {
"user": conn.login,
"passwd": conn.password or ''
}
conn_config["host"] = conn.host or 'localhost'
conn_config["db"] = conn.schema or ''
conn = MySQLdb.connect(**conn_config)
return conn
Run Code Online (Sandbox Code Playgroud)
class MySqlOperator(BaseOperator):
@apply_defaults
def __init__(
self, sql, mysql_conn_id='mysql_default', parameters=None,
autocommit=False, *args, **kwargs):
super(MySqlOperator, self).__init__(*args, **kwargs)
self.mysql_conn_id = mysql_conn_id
self.sql = sql
self.autocommit = autocommit …Run Code Online (Sandbox Code Playgroud) airflow ×10
apache-airflow ×10
python ×4
amazon-ec2 ×1
django ×1
etl ×1
mysql ×1
operators ×1
orm ×1
python-3.x ×1
scheduler ×1
ubuntu-16.04 ×1