如何在Airflow UI上停止/终止正在运行的任务?我在用LocalExecutor
.即使我使用CeleryExecutor
,我怎么能杀死/停止正在运行的任务?
将参数传递给Airflow中的相关任务的方法是什么?我有很多bashes文件,我正在尝试将此方法迁移到气流,但我不知道如何在任务之间传递一些属性.
这是一个真实的例子:
#sqoop bash template
sqoop_template = """
sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/
"""
s3_template = """
s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}}
"""
#Task of extraction in EMR
t1 = BashOperator(
task_id='extract_account',
bash_command=sqoop_template,
params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},
dag=dag)
#Task to upload in s3 backup.
t2 = BashOperator(
task_id='s3_upload',
bash_command=s3_template,
params={}, #here i need the dir name created in t1
depends_on_past=True
)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
在t2中,我需要访问在t1中创建的目录名称.
#Execute a valid job sqoop
def sqoop_import(table_name, job_name):
s3, hdfs = …
Run Code Online (Sandbox Code Playgroud) Airflow和Nifi在工作流程中执行相同的工作吗?每个人的赞成/赞成是什么?我需要读取一些json文件,向其添加更多自定义元数据并将其放入要处理的Kafka队列中.我能够在Nifi中做到这一点.我还在研究Airflow.我正在尝试为我的项目选择最好的工作流引擎谢谢!
我使用的是LocalExecutor,我的dag有3个任务,其中任务(C)依赖于任务(A).任务(B)和任务(A)可以并行运行,如下所示
A - >Ç
乙
所以任务(A)失败了,但任务(B)运行正常.任务(C)尚未运行,因为任务(A)失败.
我的问题是我如何单独运行任务(A),因此任务(A)运行一旦任务(A)完成,并且Airflow UI将它们标记为成功.
我想为我们的DAG添加一些单元测试,但找不到任何单元测试.DAG的单元测试是否有框架?存在一个端到端的测试框架,但我猜它已经死了:https://issues.apache.org/jira/browse/AIRFLOW-79.请建议,谢谢!
运行Airflow的常用说明不适用于Windows环境:
# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow
# install from pypi using pip
pip install airflow
# initialize the database
airflow initdb
# start the web server, default port is 8080
airflow webserver -p 8080
Run Code Online (Sandbox Code Playgroud)
Airflow实用程序在命令行中不可用,我无法在其他地方找到它以手动添加.Airflow如何在Windows上运行?
写在气流日志的一种方法是从PythonOperator返回一个字符串,像第44行这里.
还有其他方法可以写入气流日志文件吗?我发现print语句没有保存到日志中.
继气流教程这里.
问题:Web服务器返回以下错误
Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name
MyFirstOperator
Run Code Online (Sandbox Code Playgroud)
注意: 目录结构如下所示:
airflow_home
??? airflow.cfg
??? airflow.db
??? dags
? ??? test_operators.py
??? plugins
? ??? my_operators.py
??? unittests.cfg
Run Code Online (Sandbox Code Playgroud)
我试图在'test_operators.py'中导入插件,如下所示:
from airflow.operators import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)
代码与教程中的代码完全相同.
我需要引用一个由a返回的变量BashOperator
.我可能做错了所以请原谅我.在我task_archive_s3_file
,我需要从中获取文件名get_s3_file
.任务只是打印{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}
为字符串而不是值.
如果我使用bash_command
,则值正确打印.
get_s3_file = PythonOperator(
task_id='get_s3_file',
python_callable=obj.func_get_s3_file,
trigger_rule=TriggerRule.ALL_SUCCESS,
dag=dag)
submit_file_to_spark = BashOperator(
task_id='submit_file_to_spark',
bash_command="echo 'hello world'",
trigger_rule="all_done",
xcom_push=True,
dag=dag)
task_archive_s3_file = PythonOperator(
task_id='archive_s3_file',
# bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
python_callable=obj.func_archive_s3_file,
params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
dag=dag)
get_s3_file >> submit_file_to_spark >> task_archive_s3_file
Run Code Online (Sandbox Code Playgroud) 我正在尝试提供有用的信息,但我远非数据工程师.
我目前正在使用python库pandas对我的数据执行一系列转换,这些数据有很多输入(目前是CSV和excel文件).输出是几个excel文件.我希望能够通过并行计算执行计划的受监视批处理作业(我的意思是不像我正在做的那样使用pandas),每月一次.
我真的不知道Beam或Airflow,我很快就通读了文档,似乎两者都可以实现.我应该使用哪一个?
airflow ×10
python ×5
hadoop ×2
apache ×1
apache-beam ×1
apache-nifi ×1
bash ×1
bigdata ×1
flask ×1
flask-admin ×1
pandas ×1
windows ×1