标签: apache-airflow

使用 Airflow 运行 .EXE 和 Powershell 任务

我们的系统基本上只是结合 MS SQL Server 运行 C# 和 Powershell 应用程序的 Windows Server。我们有一个内部的 WorkflowManagement 解决方案,它能够运行执行 EXE/BAT/PS1 甚至调用 DLL 函数的任务。

现在我正在评估 Apache Airflow 是否对我们来说是一个更好的解决方案。到目前为止,我的天真计划是在 Linux 机器上运行气流调度程序,然后让消费者在 Windows 机器上运行。但是,例如,我将如何设置使用者以运行 .exe 任务?

我是否需要创建一个接受 HTTP 调用然后执行 .Exe 文件的 Wrapper-Service?

airflow apache-airflow

2
推荐指数
1
解决办法
5243
查看次数

气流实验api dagrun给出400错误:输入参数应该是什么

邮递员对http:// host:8080 / api / experimental / dags / test_flow / dag_runs的POST请求 给出“ 400错误请求:浏览器(或代理)发送了该服务器无法理解的请求”。当它尝试从请求中获取get_json时。即在行数据= request.get_json(force = True)

此API调用的输入应为..?

airflow apache-airflow

2
推荐指数
1
解决办法
802
查看次数

运行时添加到 DAG 的任务无法调度

我的想法是有一个foo生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。

因此,理想情况下,我的 DAG 应如下所示: 在此处输入图片说明

这里唯一的变量是生成的任务数量。在完成所有这些任务后,我想再做一些任务,因此为每个任务启动一个新的 DAG 似乎并不合适。

这是我的代码:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1)
}

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)

foo_operator = BashOperator(
    task_id='foo',
    bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
    xcom_push=True,
    dag=dag)

def gen_nodes(**kwargs):
    ti = kwargs['ti']
    workers = json.loads(ti.xcom_pull(task_ids='foo'))

    for wid in workers:
        print("Iterating worker %s" % wid)
        op = PythonOperator(
            task_id='test_op_%s' % wid,
            python_callable=lambda: print("Dynamic task!"),
            dag=dag
        )

        op.set_downstream(bar_operator)
        op.set_upstream(dummy_op)

gen_subdag_node_op = PythonOperator(
    task_id='gen_subdag_nodes',
    python_callable=gen_nodes,
    provide_context=True,
    dag=dag
)

gen_subdag_node_op.set_upstream(foo_operator)

dummy_op …
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow airflow-scheduler

2
推荐指数
1
解决办法
1735
查看次数

动态创建任务列表

我有一个 DAG,它是通过查询 DynamoDB 的列表创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。在下面的示例中未显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用它set_upstream来强制执行依赖关系。

- airflow_home
  \- dags
    \- workflow.py
Run Code Online (Sandbox Code Playgroud)

工作流.py

def get_task_list():
    # ... query dynamodb ...

def run_task(task):
    # ... do stuff ...

dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
    t = PythonOperator(
        task_id=task['id'],
        provide_context=False,
        dag=dag,
        python_callable=run_task,
        op_args=[task]
    )
Run Code Online (Sandbox Code Playgroud)

问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法受到 AWS 的限制并抛出异常。

我认为这是因为无论何时run_task()被调用,它都会运行所有全局变量,workflow.py所以我尝试run_task()进入一个单独的模块,如下所示:

- airflow_home
  \- dags
    \- workflow.py
    \- mypackage
      \- __init__
      \- task.py
Run Code Online (Sandbox Code Playgroud)

但这并没有改变任何事情。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为方式仍然相同。 …

airflow apache-airflow

2
推荐指数
1
解决办法
4398
查看次数

Apache Airflow 无法通过 FTP/SFTP 建立到远程主机的连接

我是 Apache Airflow 的新手,到目前为止,我已经能够解决我遇到的问题。

我现在撞墙了。我需要通过 sftp 将文件传输到远程服务器。我没有运气这样做。到目前为止,我已经通过各自的钩子获得了 S3 和 Postgres/Redshift 连接,以在各种 DAG 中工作。我已经能够在本地 FTP 服务器上使用 FTPHook 进行成功测试,但无法弄清楚如何使用 SFTP 连接到远程主机。

我可以使用 FileZilla 通过 SFTP 连接到远程主机,所以我知道我的凭据是正确的。

通过谷歌搜索,我找到了SFTPOperator,但不知道如何使用它。我也找到了FTPSHook,但我仍然无法让它工作。

我的 Airflow 日志中不断出现错误nodename nor servname provided, or not known或一般信息Operation timed out

有人可以指出我正确的方向吗?我应该将 FTPSHook 与 SSH 还是 FTP Airflow Conn 类型一起使用?还是我需要使用 SFTPOperator?我也很困惑我应该如何在我的 Airflow 连接中设置凭据。我使用 SSH 配置文件还是 FTP?

如果我可以提供更多可能有帮助的其他信息,请告诉我。

干杯!

ftp ssh sftp airflow apache-airflow

2
推荐指数
1
解决办法
3438
查看次数

我可以在Windows上运行Airflow吗?

我现在已经研究了几个小时但我无法确认如果到2017年10月,你可以在Windows上运行气流.我已经使用Python包"pip install airflow"安装了它,但我无法初始化它甚至看到版本,我认为它无法在Windows上运行.

python airflow apache-airflow

2
推荐指数
1
解决办法
6373
查看次数

Apache Airflow集群的最低硬件要求

设置Apache Airflow集群的最低硬件要求是什么?

例如。群集中不同类型节点的RAM,CPU,磁盘等。

谢谢

airflow apache-airflow airflow-scheduler

2
推荐指数
2
解决办法
2895
查看次数

气流任务未被触发

我正在安排dag,它显示在运行状态但任务没有被触发.Airflow调度程序和Web服务器启动并运行.我在UI上将dag切换为ON.我仍然无法解决问题.我正在使用CeleryExecutor尝试更改为SequentialExecutor但没有运气.

scheduler airflow apache-airflow

1
推荐指数
1
解决办法
1685
查看次数

在 Airflow 中生成多个任务时颠倒了上游/下游关系

可以在此处找到与此问题相关的原始代码。

我对移位运算符和set_upstream/set_downstream方法都在我在 DAG 中定义的任务循环中工作感到困惑。当 DAG 的主执行循环配置如下:

for uid in dash_workers.get_id_creds():
    clear_tables.set_downstream(id_worker(uid))
Run Code Online (Sandbox Code Playgroud)

或者

for uid in dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)
Run Code Online (Sandbox Code Playgroud)

该图如下所示(字母数字序列是用户 ID,也定义了任务 ID):

在此处输入图片说明

当我像这样配置 DAG 的主执行循环时:

for uid in dash_workers.get_id_creds():
    clear_tables.set_upstream(id_worker(uid))
Run Code Online (Sandbox Code Playgroud)

或者

for uid in dash_workers.get_id_creds():
    id_worker(uid) >> clear_tables
Run Code Online (Sandbox Code Playgroud)

该图如下所示:

在此处输入图片说明

第二张图是我想要的/我希望根据我对文档的阅读而生成的前两个代码片段。如果我想clear_tables在为不同的用户 ID 触发我的一批数据解析任务之前先执行,我应该将其表示为clear_tables >> id_worker(uid)

编辑- 这是完整的代码,自我发布最后几个问题以来已经更新,以供参考:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, …
Run Code Online (Sandbox Code Playgroud)

python python-3.x airflow apache-airflow

1
推荐指数
1
解决办法
1981
查看次数

使用 systemd 的气流:`airflow.pid` 与 `airflow-monitor.pid`

我的 systemd 单元文件正在工作(如下)。

但是,该airflow-monitor.pid文件会暂时变为只读,这有时会阻止气流启动。如果发生这种情况,我们的解决方法是删除airflow-monitor.pid。这与airflow.pid 不是同一个文件。

它看起来像airflow.pidgunicorn 并且airflow-monitor.pid是一个 python 进程作为气流网络服务器。

systemd 单元文件:

[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service

[Service]
# by default we just set $AIRFLOW_HOME to its default dir: $HOME/airflow , so lets skip this for now
EnvironmentFile=/home/airflow/airflow/airflow.systemd.environment

#WorkingDirectory=/home/airflow/airflow-venv
#Environment=PATH="/home/airflow/airflow-venv/bin:$PATH"
PIDFile=/home/airflow/airflow/airflow.pid
User=airflow
Group=airflow
Type=simple
# this was originally the file webserver.pid but did not run
#ExecStart=/bin/bash -c 'source /home/airflow/airflow-venv/bin/activate ; /home/airflow/airflow-venv/bin/airflow webserver -p 8080 --pid /home/airflow/airflow/airflow.pid --daemon' …
Run Code Online (Sandbox Code Playgroud)

python systemd airflow apache-airflow

1
推荐指数
1
解决办法
1057
查看次数

气流-如何忽略回填中成功完成的任务?

我已经在DAG中添加了新任务,它需要回填它们。目前,当我运行airflow backfill它时,它将运行所有任务(新任务和旧任务),而我想忽略已经成功的旧任务。

有什么方法可以在回填中跳过具有成功状态的任务?

airflow apache-airflow

1
推荐指数
1
解决办法
1032
查看次数

如何在Airflow中实施轮询?

我想使用Airflow实施数据流,这些数据流定期轮询外部系统(ftp服务器等),检查是否符合特定条件的新文件,然后为这些文件运行一堆任务。现在,我是Airflow的新手,并且读到Sensor是在这种情况下要使用的东西,实际上我设法编写了一个在运行“ airflow test”时可以正常工作的传感器。但是我对于传感器的poke_interval和DAG调度的关系有些困惑。如何为用例定义这些设置?还是应该使用其他方法?我只希望Airflow在这些文件可用时运行任务,而不是在一段时间内没有新文件可用时使仪表板充满故障。

airflow apache-airflow

0
推荐指数
1
解决办法
1213
查看次数