标签: airflow

如何从UI停止/终止Airflow任务

如何在Airflow UI上停止/终止正在运行的任务?我在用LocalExecutor.即使我使用CeleryExecutor,我怎么能杀死/停止正在运行的任务?

python apache hadoop airflow apache-airflow

29
推荐指数
5
解决办法
3万
查看次数

气流将参数传递给从属任务

将参数传递给Airflow中的相关任务的方法是什么?我有很多bashes文件,我正在尝试将此方法迁移到气流,但我不知道如何在任务之间传递一些属性.

这是一个真实的例子:

#sqoop bash template
sqoop_template = """
        sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/
    """

s3_template = """
        s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}}
    """



#Task of extraction in EMR
t1 = BashOperator(
        task_id='extract_account', 
        bash_command=sqoop_template, 
        params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},
        dag=dag)
#Task to upload in s3 backup.
t2 = BashOperator(
        task_id='s3_upload',
        bash_command=s3_template,
        params={}, #here i need the dir name created in t1
        depends_on_past=True
    )

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

在t2中,我需要访问在t1中创建的目录名称.

#Execute a valid job sqoop
def sqoop_import(table_name, job_name):
    s3, hdfs = …
Run Code Online (Sandbox Code Playgroud)

bash airflow

28
推荐指数
1
解决办法
2万
查看次数

Airbnb Airflow与Apache Nifi

Airflow和Nifi在工作流程中执行相同的工作吗?每个人的赞成/赞成是什么?我需要读取一些json文件,向其添加更多自定义元数据并将其放入要处理的Kafka队列中.我能够在Nifi中做到这一点.我还在研究Airflow.我正在尝试为我的项目选择最好的工作流引擎谢谢!

python airflow apache-nifi

26
推荐指数
1
解决办法
2万
查看次数

如何在Airflow上重新启动失败的任务

我使用的是LocalExecutor,我的dag有3个任务,其中任务(C)依赖于任务(A).任务(B)和任务(A)可以并行运行,如下所示

A - >Ç

所以任务(A)失败了,但任务(B)运行正常.任务(C)尚未运行,因为任务(A)失败.

我的问题是我如何单独运行任务(A),因此任务(A)运行一旦任务(A)完成,并且Airflow UI将它们标记为成功.

python hadoop bigdata airflow apache-airflow

25
推荐指数
2
解决办法
1万
查看次数

气流Python单元测试?

我想为我们的DAG添加一些单元测试,但找不到任何单元测试.DAG的单元测试是否有框架?存在一个端到端的测试框架,但我猜它已经死了:https://issues.apache.org/jira/browse/AIRFLOW-79.请建议,谢谢!

python airflow

25
推荐指数
2
解决办法
4596
查看次数

如何在Windows上运行Airflow

运行Airflow的常用说明不适用于Windows环境:

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080
Run Code Online (Sandbox Code Playgroud)

Airflow实用程序在命令行中不可用,我无法在其他地方找到它以手动添加.Airflow如何在Windows上运行?

python windows flask flask-admin airflow

24
推荐指数
6
解决办法
3万
查看次数

写入Airflow日志

写在气流日志的一种方法是从PythonOperator返回一个字符串,像第44行这里.

还有其他方法可以写入气流日志文件吗?我发现print语句没有保存到日志中.

airflow

23
推荐指数
3
解决办法
1万
查看次数

无法导入Airflow插件

继气流教程这里.

问题:Web服务器返回以下错误

Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name 
MyFirstOperator
Run Code Online (Sandbox Code Playgroud)

注意: 目录结构如下所示:

airflow_home
??? airflow.cfg
??? airflow.db
??? dags
?   ??? test_operators.py  
??? plugins
?   ??? my_operators.py   
??? unittests.cfg
Run Code Online (Sandbox Code Playgroud)

我试图在'test_operators.py'中导入插件,如下所示:

from airflow.operators import MyFirstOperator
Run Code Online (Sandbox Code Playgroud)

代码与教程中的代码完全相同.

airflow

23
推荐指数
5
解决办法
2万
查看次数

Airflow - 如何将xcom变量传递给Python函数

我需要引用一个由a返回的变量BashOperator.我可能做错了所以请原谅我.在我task_archive_s3_file,我需要从中获取文件名get_s3_file.任务只是打印{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}为字符串而不是值.

如果我使用bash_command,则值正确打印.

get_s3_file = PythonOperator(
    task_id='get_s3_file',
    python_callable=obj.func_get_s3_file,
    trigger_rule=TriggerRule.ALL_SUCCESS,
    dag=dag)

submit_file_to_spark = BashOperator(
    task_id='submit_file_to_spark',
    bash_command="echo 'hello world'",
    trigger_rule="all_done",
    xcom_push=True,
    dag=dag)

task_archive_s3_file = PythonOperator(
    task_id='archive_s3_file',
#    bash_command="echo {{ ti.xcom_pull(task_ids='submit_file_to_spark') }}",
    python_callable=obj.func_archive_s3_file,
    params={'s3_path_filename': "{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}" },
    dag=dag)

get_s3_file >> submit_file_to_spark >> task_archive_s3_file
Run Code Online (Sandbox Code Playgroud)

airflow

23
推荐指数
4
解决办法
3万
查看次数

Apache Airflow或Apache Beam用于数据处理和作业调度

我正在尝试提供有用的信息,但我远非数据工程师.

我目前正在使用python库pandas对我的数据执行一系列转换,这些数据有很多输入(目前是CSV和excel文件).输出是几个excel文件.我希望能够通过并行计算执行计划的受监视批处理作业(我的意思是不像我正在做的那样使用pandas),每月一次.

我真的不知道Beam或Airflow,我很快就通读了文档,似乎两者都可以实现.我应该使用哪一个?

pandas airflow apache-beam

23
推荐指数
3
解决办法
9560
查看次数