我们的系统基本上只是结合 MS SQL Server 运行 C# 和 Powershell 应用程序的 Windows Server。我们有一个内部的 WorkflowManagement 解决方案,它能够运行执行 EXE/BAT/PS1 甚至调用 DLL 函数的任务。
现在我正在评估 Apache Airflow 是否对我们来说是一个更好的解决方案。到目前为止,我的天真计划是在 Linux 机器上运行气流调度程序,然后让消费者在 Windows 机器上运行。但是,例如,我将如何设置使用者以运行 .exe 任务?
我是否需要创建一个接受 HTTP 调用然后执行 .Exe 文件的 Wrapper-Service?
邮递员对http:// host:8080 / api / experimental / dags / test_flow / dag_runs的POST请求 给出“ 400错误请求:浏览器(或代理)发送了该服务器无法理解的请求”。当它尝试从请求中获取get_json时。即在行数据= request.get_json(force = True)
此API调用的输入应为..?
我的想法是有一个foo生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。
这里唯一的变量是生成的任务数量。在完成所有这些任务后,我想再做一些任务,因此为每个任务启动一个新的 DAG 似乎并不合适。
这是我的代码:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1)
}
dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)
foo_operator = BashOperator(
task_id='foo',
bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
xcom_push=True,
dag=dag)
def gen_nodes(**kwargs):
ti = kwargs['ti']
workers = json.loads(ti.xcom_pull(task_ids='foo'))
for wid in workers:
print("Iterating worker %s" % wid)
op = PythonOperator(
task_id='test_op_%s' % wid,
python_callable=lambda: print("Dynamic task!"),
dag=dag
)
op.set_downstream(bar_operator)
op.set_upstream(dummy_op)
gen_subdag_node_op = PythonOperator(
task_id='gen_subdag_nodes',
python_callable=gen_nodes,
provide_context=True,
dag=dag
)
gen_subdag_node_op.set_upstream(foo_operator)
dummy_op …Run Code Online (Sandbox Code Playgroud) 我有一个 DAG,它是通过查询 DynamoDB 的列表创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。在下面的示例中未显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用它set_upstream来强制执行依赖关系。
- airflow_home
\- dags
\- workflow.py
Run Code Online (Sandbox Code Playgroud)
工作流.py
def get_task_list():
# ... query dynamodb ...
def run_task(task):
# ... do stuff ...
dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
t = PythonOperator(
task_id=task['id'],
provide_context=False,
dag=dag,
python_callable=run_task,
op_args=[task]
)
Run Code Online (Sandbox Code Playgroud)
问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法受到 AWS 的限制并抛出异常。
我认为这是因为无论何时run_task()被调用,它都会运行所有全局变量,workflow.py所以我尝试run_task()进入一个单独的模块,如下所示:
- airflow_home
\- dags
\- workflow.py
\- mypackage
\- __init__
\- task.py
Run Code Online (Sandbox Code Playgroud)
但这并没有改变任何事情。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为方式仍然相同。 …
我是 Apache Airflow 的新手,到目前为止,我已经能够解决我遇到的问题。
我现在撞墙了。我需要通过 sftp 将文件传输到远程服务器。我没有运气这样做。到目前为止,我已经通过各自的钩子获得了 S3 和 Postgres/Redshift 连接,以在各种 DAG 中工作。我已经能够在本地 FTP 服务器上使用 FTPHook 进行成功测试,但无法弄清楚如何使用 SFTP 连接到远程主机。
我可以使用 FileZilla 通过 SFTP 连接到远程主机,所以我知道我的凭据是正确的。
通过谷歌搜索,我找到了SFTPOperator,但不知道如何使用它。我也找到了FTPSHook,但我仍然无法让它工作。
我的 Airflow 日志中不断出现错误nodename nor servname provided, or not known或一般信息Operation timed out。
有人可以指出我正确的方向吗?我应该将 FTPSHook 与 SSH 还是 FTP Airflow Conn 类型一起使用?还是我需要使用 SFTPOperator?我也很困惑我应该如何在我的 Airflow 连接中设置凭据。我使用 SSH 配置文件还是 FTP?
如果我可以提供更多可能有帮助的其他信息,请告诉我。
干杯!
我现在已经研究了几个小时但我无法确认如果到2017年10月,你可以在Windows上运行气流.我已经使用Python包"pip install airflow"安装了它,但我无法初始化它甚至看到版本,我认为它无法在Windows上运行.
设置Apache Airflow集群的最低硬件要求是什么?
例如。群集中不同类型节点的RAM,CPU,磁盘等。
谢谢
我正在安排dag,它显示在运行状态但任务没有被触发.Airflow调度程序和Web服务器启动并运行.我在UI上将dag切换为ON.我仍然无法解决问题.我正在使用CeleryExecutor尝试更改为SequentialExecutor但没有运气.
可以在此处找到与此问题相关的原始代码。
我对移位运算符和set_upstream/set_downstream方法都在我在 DAG 中定义的任务循环中工作感到困惑。当 DAG 的主执行循环配置如下:
for uid in dash_workers.get_id_creds():
clear_tables.set_downstream(id_worker(uid))
Run Code Online (Sandbox Code Playgroud)
或者
for uid in dash_workers.get_id_creds():
clear_tables >> id_worker(uid)
Run Code Online (Sandbox Code Playgroud)
该图如下所示(字母数字序列是用户 ID,也定义了任务 ID):
当我像这样配置 DAG 的主执行循环时:
for uid in dash_workers.get_id_creds():
clear_tables.set_upstream(id_worker(uid))
Run Code Online (Sandbox Code Playgroud)
或者
for uid in dash_workers.get_id_creds():
id_worker(uid) >> clear_tables
Run Code Online (Sandbox Code Playgroud)
该图如下所示:
第二张图是我想要的/我希望根据我对文档的阅读而生成的前两个代码片段。如果我想clear_tables在为不同的用户 ID 触发我的一批数据解析任务之前先执行,我应该将其表示为clear_tables >> id_worker(uid)
编辑- 这是完整的代码,自我发布最后几个问题以来已经更新,以供参考:
from datetime import datetime
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, …Run Code Online (Sandbox Code Playgroud) 我的 systemd 单元文件正在工作(如下)。
但是,该airflow-monitor.pid文件会暂时变为只读,这有时会阻止气流启动。如果发生这种情况,我们的解决方法是删除airflow-monitor.pid。这与airflow.pid 不是同一个文件。
它看起来像airflow.pidgunicorn 并且airflow-monitor.pid是一个 python 进程作为气流网络服务器。
systemd 单元文件:
[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service
[Service]
# by default we just set $AIRFLOW_HOME to its default dir: $HOME/airflow , so lets skip this for now
EnvironmentFile=/home/airflow/airflow/airflow.systemd.environment
#WorkingDirectory=/home/airflow/airflow-venv
#Environment=PATH="/home/airflow/airflow-venv/bin:$PATH"
PIDFile=/home/airflow/airflow/airflow.pid
User=airflow
Group=airflow
Type=simple
# this was originally the file webserver.pid but did not run
#ExecStart=/bin/bash -c 'source /home/airflow/airflow-venv/bin/activate ; /home/airflow/airflow-venv/bin/airflow webserver -p 8080 --pid /home/airflow/airflow/airflow.pid --daemon' …Run Code Online (Sandbox Code Playgroud) 我已经在DAG中添加了新任务,它需要回填它们。目前,当我运行airflow backfill它时,它将运行所有任务(新任务和旧任务),而我想忽略已经成功的旧任务。
有什么方法可以在回填中跳过具有成功状态的任务?
我想使用Airflow实施数据流,这些数据流定期轮询外部系统(ftp服务器等),检查是否符合特定条件的新文件,然后为这些文件运行一堆任务。现在,我是Airflow的新手,并且读到Sensor是在这种情况下要使用的东西,实际上我设法编写了一个在运行“ airflow test”时可以正常工作的传感器。但是我对于传感器的poke_interval和DAG调度的关系有些困惑。如何为用例定义这些设置?还是应该使用其他方法?我只希望Airflow在这些文件可用时运行任务,而不是在一段时间内没有新文件可用时使仪表板充满故障。
airflow ×12
apache-airflow ×12
python ×3
ftp ×1
python-3.x ×1
scheduler ×1
sftp ×1
ssh ×1
systemd ×1