假设你有一个气流DAG是没有意义回填,这意味着,它的运行一次后,运行它随后的时间很快就完全没有意义的.
例如,如果您从一些仅每小时更新一次的数据库加载数据到数据库中,那么快速连续发生的回填只会一次又一次地导入相同的数据.
当您实例化一个新的每小时任务时,这尤其令人讨厌,并且它会在N您指定的时间间隔内开始运行之前,每小时都会错过一次,执行冗余工作.
我能想到的唯一解决方案是他们在文档的常见问题解答中特别建议的
我们建议不要将动态值用作start_date,尤其是
datetime.now()因为它可能非常混乱.
有没有办法禁用DAG的回填,或者我应该怎么做?
我有两个环境变量.一个是TF_VAR_UN另一个是TF_VAR_PW.然后我有一个看起来像这样的terraform文件.
resource "google_container_cluster" "primary" {
name = "marcellus-wallace"
zone = "us-central1-a"
initial_node_count = 3
master_auth {
username = ${env.TF_VAR_UN}
password = ${env.TF_VAR_PW}
}
node_config {
oauth_scopes = [
"https://www.googleapis.com/auth/compute",
"https://www.googleapis.com/auth/devstorage.read_only",
"https://www.googleapis.com/auth/logging.write",
"https://www.googleapis.com/auth/monitoring"
]
}
}
Run Code Online (Sandbox Code Playgroud)
这两个值我想用环境变量替换TF_VAR_UN和TF_VAR_PW是值的用户名和密码.我尝试了上面显示的内容,没有成功,我玩弄了一些其他的东西,但总是得到语法问题.
嗨,我想使用气流配置单元运算符执行配置单元查询并将结果输出到文件。我不想在这里使用 INSERT OVERWRITE。
hive_ex = HiveOperator(
task_id='hive-ex',
hql='/sql/hive-ex.sql',
hiveconfs={
'DAY': '{{ ds }}',
'YESTERDAY': '{{ yesterday_ds }}',
'OUTPUT': '{{ file_path }}'+'csv',
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
做这个的最好方式是什么?
我知道如何使用 bash 运算符执行此操作,但想知道我们是否可以使用 hive 运算符
hive_ex = BashOperator(
task_id='hive-ex',
bash_command='hive -f hive.sql -DAY={{ ds }} >> {{ file_path }}
/file_{{ds}}.json',
dag=dag
)
Run Code Online (Sandbox Code Playgroud) 问题:我想apache-airflow在Github上使用最新版本的Apache-Airflow以及所有依赖项进行安装吗?
我该如何使用pip?
在生产环境中使用它也安全吗?
我使用Airflow来管理ETL任务的执行和计划。已创建DAG,并且工作正常。但是通过cli手动触发dag时可以传递参数。
例如:我的DAG每天在01:30运行,并处理昨天的数据(时间范围从昨天的01:30到今天的01:30)。数据源可能存在一些问题。我需要重新处理这些数据(手动指定时间范围)。
因此,我可以在预定的时候创建这样的气流DAG,使其默认时间范围为昨天的01:30到今天的01:30。然后,如果数据源有任何问题,我需要手动触发DAG并手动将时间范围作为参数传递。
据我所知airflow test有-tp可以通过PARAMS的任务。但这仅用于测试特定任务。并且airflow trigger_dag没有-tp选择。那么有没有办法将tigger_dag传递给DAG,然后操作员可以读取这些参数?
谢谢!
在阅读有关使用Java处理apache Beam中的流式元素的知识时,我遇到了DoFn<InputT, OutputT>,然后发现SimpleFunction<InputT, OutputT>。
两者看上去都与我相似,我很难理解它们之间的区别。
有人可以解释外行术语的区别吗?
我正在运行 Dataproc 并使用默认的client-mode. 作业的日志在 GCP 控制台中可见,在 GCS 存储桶中可用。但是,我想查看Stackdriver Logging 中的日志。
目前,我发现的唯一方法是cluster-mode改用。
有没有办法在使用时将日志推送到 Stackdriver client-mode?
我想将包含 google 存储中文件名的字符串列表传递给 XCom。稍后由 GoogleCloudStorageToBigQueryOperator 任务获取。该source_objects场模板,使神社模板可以使用。不幸的是,Jinja 只能返回一个字符串,因此我无法在 XCom 中传递列表。
如何在 GoogleCloudStorageToBigQueryOperator 中使用 XCom 列表?
参考类似问题,通过使用provide_context解决: 将字符串列表作为 Airflow 中依赖任务的参数传递
我找到的最接近的解决方案是创建一个包装类并发送发布 xcom 的任务的 ID,如下所示:
@apply_defaults
def __init__(self, source_objects_task_id,
....
def execute(self, context):
source_objects = context['ti']
.xcom_pull(task_ids=self.source_objects_task_id)
operator = GoogleCloudStorageToBigQueryOperator(
source_objects=source_objects,
dag=self.dag,
....
)
operator.execute(context)
Run Code Online (Sandbox Code Playgroud) 我想做一个松弛警报(失败状态和成功状态)
Dag 现在正在工作,当状态为 sucssess 状态时,松弛警报正在工作!但现在, on_failure_callback - 失败状态持续工作(每 1 分钟一次)
请注意,它继续失败。但是,它不起作用,我认为这不是真正的状态。
我该如何改变呢?...我想知道关于真正失败状态的松弛通知
现在我们的 task_default 参数是这样的。
dt = datetime.now(tz=tz.tzlocal())
task_default_args = {
'owner': 'owner',
'retries': 2,
'retry_delay': timedelta(minutes=1),
'start_date': datetime(2018, 11, 10),
#'depends_on_past': False,
'email': ['mail'],
'email_on_failure': True,
'email_on_retry': False,
'on_failure_callback': send_slack(
senderRole='airflow',
receiverSubscribe='bot',
level='info',
text='= fail' + str(dt),
X_CAG_AUTH='AG_CONSUMER_TOKEN access-key=500000000000',
),
'execution_timeout': timedelta(minutes=30)
}
-- > Dag Contents like this
start = DummyOperator(
task_id='start',
dag=dag)
tmp_slack_test_dag = PostgresOperator(pool=redshift_pool,
task_id='tmp_slack_test_sql',
postgres_conn_id=redshift_conn_id,
sql="""sql/tmp_.sql""",
parameters=None,
autocommit=True,
dag=dag
)
success_dummy = DummyOperator(
task_id='success_dummy',
dag=dag, …Run Code Online (Sandbox Code Playgroud)