Airflow:如何从其他服务器SSH和运行BashOperator

CMP*_*MPE 18 python ssh airflow

有没有办法ssh到不同的服务器并使用Airbnb的Airflow运行BashOperator?我正在尝试使用Airflow运行hive sql命令,但我需要SSH到另一个框以运行hive shell.我的任务应如下所示:

  1. SSH到server1
  2. 启动Hive shell
  3. 运行Hive命令

谢谢!

CMP*_*MPE 28

我想我只想出来:

  1. 在管理>连接下的UI中创建SSH连接.注意:如果重置数据库,将删除连接

  2. 在Python文件中添加以下内容

    from airflow.contrib.hooks import SSHHook
    sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
    
    Run Code Online (Sandbox Code Playgroud)
  3. 添加SSH操作员任务

    t1 = SSHExecuteOperator(
        task_id="task1",
        bash_command=<YOUR COMMAND>,
        ssh_hook=sshHook,
        dag=dag)
    
    Run Code Online (Sandbox Code Playgroud)

谢谢!

  • 请注意,您还必须导入运算符:from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator (9认同)
  • 使用最新的气流版本1.10**不推荐使用SSHExecuteOperator**,并且必须使用新的**SSHOperator**.如果有人使用1.10那么新的导入应该来自airflow.contrib.hooks.ssh_hook import SSHHook`和`from airflow.contrib.operators.ssh_operator import SSHOperator`. (8认同)

pol*_*ity 15

有一点要注意与安东的回答是,争论其实ssh_conn_id,不是conn_idSSHOperator对象。至少在1.10版中。

一个简单的例子看起来像

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime.now() - timedelta(minutes=20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id='testing_stuff',
          default_args=default_args,
          schedule_interval='0,10,20,30,40,50 * * * *',
          dagrun_timeout=timedelta(seconds=120))
# Step 1 - Dump data from postgres databases
t1_bash = """
echo 'Hello World'
"""
t1 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='test_ssh_operator',
    command=t1_bash,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

  • 这应该是apache气流1.10的答案 (2认同)

art*_*ode 11

以下是 Airflow 2 中 ssh 操作符的工作示例:

[注意:该运算符的输出是 base64 编码的]

from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
sshHook = SSHHook(ssh_conn_id="conn-id", key_file='/opt/airflow/keys/ssh.key')
# a hook can also be defined directly in the code:
# sshHook = SSHHook(remote_host='server.com', username='admin', key_file='/opt/airflow/keys/ssh.key')

ls = SSHOperator(
        task_id="ls",
        command= "ls -l",
        ssh_hook = sshHook,
        dag = dag)

Run Code Online (Sandbox Code Playgroud)

conn-id是在管理 -> 连接中设置的。这key_file是 ssh 私钥。