我在学习 apache 气流,发现有一个叫做 DummyOperator 的操作符。我用谷歌搜索了它的用例,但找不到任何我能理解的东西。这里有人可以讨论它的用例吗?
当我使用 Apache Airflow 时,我似乎无法找到为什么有人会创建CustomOperator
一个PythonOperator
. 如果我在 aPythonOperator
而不是 a 中使用 python 函数,它会不会导致相同的结果CustomOperator
?
如果有人知道什么是不同的用例和最佳实践,那就太好了!!
非常感谢你的帮助
使用 PapermillOperator 运行气流作业时,dag 执行失败。
我在将参数传递给 PapermillOperator 时遇到问题。
我打开papermill_operator.py (packages/ airflow /operators/papermill_operator.py) 并硬编码一行来指定 papameters
def execute(self, context):
for i in range(len(self.inlets)):
pm.execute_notebook(self.inlets[i].location,
self.outlets[i].location,
parameters = dict(msgs="hello")
progress_bar=False, report_mode=True)
Run Code Online (Sandbox Code Playgroud)
然后它的工作
而原始代码是
def execute(self, context):
for i in range(len(self.inlets)):
pm.execute_notebook(self.inlets[i].location,
self.outlets[i].location,
parameters=self.inlets[i].parameters,
progress_bar=False, report_mode=True)
Run Code Online (Sandbox Code Playgroud)
尝试了另一个解决方案 https://github.com/nteract/papermill/issues/324#issuecomment-472446375 它工作正常
我的 DAG 代码是
import airflow
from airflow.models import DAG
from airflow.operators.papermill_operator import PapermillOperator
from datetime import timedelta
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='9', default_args=args,
schedule_interval='@once',
dagrun_timeout=timedelta(minutes=10))
run_this = PapermillOperator( …
Run Code Online (Sandbox Code Playgroud) 我今天尝试创建我的第一个气流 DAG:
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'default_user',
'start_date': days_ago(2),
'depends_on_past': True,
# With this set to true, the pipeline won't run if the previous day failed
'email': ['demo@email.de'],
'email_on_failure': True,
# upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(
'basic_dag_2',
default_args=default_args, …
Run Code Online (Sandbox Code Playgroud) Airflow 有没有办法从 PythonOperator 跳过当前任务?例如:
def execute():
if condition:
skip_current_task()
task = PythonOperator(task_id='task', python_callable=execute, dag=some_dag)
Run Code Online (Sandbox Code Playgroud)
并且还在 Airflow UI 中将任务标记为“已跳过”?
当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?
python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator
是否可以仅在发生特定事件(例如将文件放入特定 S3 存储桶的事件)时运行气流任务。类似于 AWS Lambda 事件的内容
有,S3KeySensor
但我不知道它是否符合我的要求(仅在事件发生时运行 Task)
这是使问题更清楚的示例:
我有一个传感器对象如下
sensor = S3KeySensor(
task_id='run_on_every_file_drop',
bucket_key='file-to-watch-*',
wildcard_match=True,
bucket_name='my-sensor-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
使用上述传感器对象,传感器任务的气流行为如下:
my-sensor-bucket
切换 DAG 之前已经存在与 S3 存储桶中的通配符匹配的对象名称ON
,则运行该任务(由于过去的 s3 对象的存在,我不想运行该任务)my-sensor-bucket
)我试图了解气流中的任务是否只能基于调度(如 cron 作业)或传感器(仅基于传感标准一次)运行,还是不能像基于事件的管道(类似于 AWS Lambda 的东西)那样设置
amazon-s3 airflow airflow-scheduler apache-airflow-xcom airflow-operator
在 Airflow UI 中,“浏览器 > 日志”下可用的日志事件之一是事件“触发器”以及负责触发此事件的 DAG ID 和所有者/用户。这些信息是否可以通过编程轻松获取?
用例是,我有一个 DAG,它允许一部分用户手动触发执行。根据触发此 DAG 执行的用户,从此 DAG 执行代码的行为会有所不同。
先感谢您。
airflow airflow-scheduler apache-airflow-xcom airflow-operator
每当我将文件参数添加到email_task
我的运行失败时。
email_task = EmailOperator(
task_id='email_sample_data',
to='sample@sample.com',
subject='Forecast for the day',
html_content= "Sample",
files=['/home/airflow/sample.html'],
dag=dag)
Run Code Online (Sandbox Code Playgroud)
我收到一个错误,提示找不到文件。气流在哪里选择我的文件,我需要在哪里上传文件,“文件”参数的正确语法是什么?
我的 DAG 摘要:
我正在使用 SSH Operator 通过 SSH 连接到 EC2 实例并运行将连接到多个数据库的 JAR 文件。我已经在 DAG 文件中声明了 Airflow Connection,并且能够将变量传递到 EC2 实例中。正如您从下面看到的,我将属性传递给 JAVA 命令。
Airflow version - airflow-1-10.7
Package installed - apache-airflow[crypto]
Run Code Online (Sandbox Code Playgroud)
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.hooks.base_hook import BaseHook
from airflow.models.connection import Connection
ssh_hook = SSHHook(ssh_conn_id='ssh_to_ec2')
ssh_hook.no_host_key_check = True
redshift_connection = BaseHook.get_connection("my_redshift")
rs_user = redshift_connection.login
rs_password = redshift_connection.password
mongo_connection = BaseHook.get_connection("my_mongo")
mongo_user = mongo_connection.login
mongo_password = mongo_connection.password
default_args = …
Run Code Online (Sandbox Code Playgroud)