标签: airflow-operator

虚拟运算符的用例

我在学习 apache 气流,发现有一个叫做 DummyOperator 的操作符。我用谷歌搜索了它的用例,但找不到任何我能理解的东西。这里有人可以讨论它的用例吗?

airflow airflow-operator

16
推荐指数
2
解决办法
1万
查看次数

为什么在 Apache Airflow 中使用 CustomOperator 而不是 PythonOperator?

当我使用 Apache Airflow 时,我似乎无法找到为什么有人会创建CustomOperator一个PythonOperator. 如果我在 aPythonOperator而不是 a 中使用 python 函数,它会不会导致相同的结果CustomOperator

如果有人知道什么是不同的用例和最佳实践,那就太好了!!

非常感谢你的帮助

airflow airflow-operator

9
推荐指数
1
解决办法
516
查看次数

如何将参数传递给 PapermillOperator 以在气流上运行作业?

使用 PapermillOperator 运行气流作业时,dag 执行失败。

我在将参数传递给 PapermillOperator 时遇到问题。

我打开papermill_operator.py (packages/ airflow /operators/papermill_operator.py) 并硬编码一行来指定 papameters

def execute(self, context):
        for i in range(len(self.inlets)):
            pm.execute_notebook(self.inlets[i].location, 
                                self.outlets[i].location,
                                parameters = dict(msgs="hello")
                                progress_bar=False, report_mode=True)
Run Code Online (Sandbox Code Playgroud)

然后它的工作

而原始代码是

def execute(self, context):
        for i in range(len(self.inlets)):
            pm.execute_notebook(self.inlets[i].location, 
                                self.outlets[i].location,
                                parameters=self.inlets[i].parameters,
                                progress_bar=False, report_mode=True)
Run Code Online (Sandbox Code Playgroud)

尝试了另一个解决方案 https://github.com/nteract/papermill/issues/324#issuecomment-472446375 它工作正常

我的 DAG 代码是

import airflow

from airflow.models import DAG
from airflow.operators.papermill_operator import PapermillOperator

from datetime import timedelta

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),

}

dag = DAG(
    dag_id='9', default_args=args,
    schedule_interval='@once',
    dagrun_timeout=timedelta(minutes=10))

run_this = PapermillOperator( …
Run Code Online (Sandbox Code Playgroud)

airflow jupyter-notebook papermill airflow-operator

8
推荐指数
1
解决办法
1377
查看次数

dag.py 引发:“airflow.exceptions.AirflowException:任务缺少 start_date 参数”,但它在代码中给出

我今天尝试创建我的第一个气流 DAG:

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'default_user',
    'start_date': days_ago(2),
    'depends_on_past': True,
    # With this set to true, the pipeline won't run if the previous day failed
    'email': ['demo@email.de'],
    'email_on_failure': True,
    # upon failure this pipeline will send an email to your email set above
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
}

dag = DAG(
    'basic_dag_2',
    default_args=default_args, …
Run Code Online (Sandbox Code Playgroud)

python-3.x airflow airflow-scheduler airflow-operator

8
推荐指数
1
解决办法
9543
查看次数

如何跳过气流操作员中的任务?

Airflow 有没有办法从 PythonOperator 跳过当前任务?例如:

def execute():
    if condition:
        skip_current_task()

task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
Run Code Online (Sandbox Code Playgroud)

并且还在 Airflow UI 中将任务标记为“已跳过”?

python airflow airflow-scheduler airflow-operator

7
推荐指数
1
解决办法
3074
查看次数

没有名为airfow.gcp的模块-如何运行使用python3 / beam 2.15的数据流作业?

当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?

我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?

python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator

7
推荐指数
1
解决办法
159
查看次数

基于事件触发和运行气流任务将文件放入 S3 存储桶

是否可以仅在发生特定事件(例如将文件放入特定 S3 存储桶的事件)时运行气流任务。类似于 AWS Lambda 事件的内容

有,S3KeySensor但我不知道它是否符合我的要求(仅在事件发生时运行 Task)

这是使问题更清楚的示例:

我有一个传感器对象如下

sensor = S3KeySensor(
    task_id='run_on_every_file_drop',
    bucket_key='file-to-watch-*',
    wildcard_match=True,
    bucket_name='my-sensor-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

使用上述传感器对象,传感器任务的气流行为如下:

  • 如果在气流管理 UI 中my-sensor-bucket切换 DAG 之前已经存在与 S3 存储桶中的通配符匹配的对象名称ON,则运行该任务(由于过去的 s3 对象的存在,我不想运行该任务)
  • 运行一次后,每当有新的 S3 文件对象删除时,传感器任务将不会再次运行(我想每次在存储桶中删除一个新的 S3 文件对象时,都运行 DAG 中的传感器任务和后续任务my-sensor-bucket
  • 如果您配置调度程序,任务将基于调度而不是基于事件运行。所以在这种情况下调度程序似乎不是一个选项

我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,还是不能像基于事件的管道(类似于 AWS Lambda 的东西)那样设置

amazon-s3 airflow airflow-scheduler apache-airflow-xcom airflow-operator

6
推荐指数
1
解决办法
3433
查看次数

如何获得手动触发 DAG 的 Airflow 用户?

在 Airflow UI 中,“浏览器 > 日志”下可用的日志事件之一是事件“触发器”以及负责触发此事件的 DAG ID 和所有者/用户。这些信息是否可以通过编程轻松获取?

用例是,我有一个 DAG,它允许一部分用户手动触发执行。根据触发此 DAG 执行的用户,从此 DAG 执行代码的行为会有所不同。

先感谢您。

airflow airflow-scheduler apache-airflow-xcom airflow-operator

6
推荐指数
1
解决办法
995
查看次数

如何在 Airflow 中的电子邮件操作员上附加文件

每当我将文件参数添加到email_task我的运行失败时。

email_task = EmailOperator(    
           task_id='email_sample_data',    
           to='sample@sample.com',    
           subject='Forecast for the day',    
           html_content= "Sample",
           files=['/home/airflow/sample.html'],    
           dag=dag)
Run Code Online (Sandbox Code Playgroud)

我收到一个错误,提示找不到文件。气流在哪里选择我的文件,我需要在哪里上传文件,“文件”参数的正确语法是什么?

airflow airflow-operator

6
推荐指数
1
解决办法
93
查看次数

将另一个气流连接传递给气流 SSH Operator 时,如何从日志和渲染模板中隐藏密码

我的 DAG 摘要:

我正在使用 SSH Operator 通过 SSH 连接到 EC2 实例并运行将连接到多个数据库的 JAR 文件。我已经在 DAG 文件中声明了 Airflow Connection,并且能够将变量传递到 EC2 实例中。正如您从下面看到的,我将属性传递给 JAVA 命令。

Airflow version - airflow-1-10.7
Package installed - apache-airflow[crypto]
Run Code Online (Sandbox Code Playgroud)
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.hooks.base_hook import BaseHook
from airflow.models.connection import Connection

ssh_hook = SSHHook(ssh_conn_id='ssh_to_ec2')
ssh_hook.no_host_key_check = True
redshift_connection = BaseHook.get_connection("my_redshift")

rs_user = redshift_connection.login
rs_password = redshift_connection.password

mongo_connection = BaseHook.get_connection("my_mongo")
mongo_user = mongo_connection.login
mongo_password = mongo_connection.password


default_args = …
Run Code Online (Sandbox Code Playgroud)

password-hash airflow-operator

5
推荐指数
1
解决办法
525
查看次数