我有一个虚拟 DAG,我想通过将其设置start_date为today并让其调度间隔为daily
这是 DAG 代码:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- airflow: DAG -*-
import logging
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
logger = logging.getLogger("DummyDAG")
def execute_python_function():
logging.info("HEY YO !!!")
return True
dag = DAG(dag_id='dummy_dag',
start_date=datetime.today())
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
py_operator = PythonOperator(task_id='exec_function',
python_callable=execute_python_function,
dag=dag)
start >> py_operator >> end
Run Code Online (Sandbox Code Playgroud)
在 Airflow 1.9.0 中,当我airflow trigger_dag -e 20190701 …
我有一些失败的 DAG,比方说从 1 月 1 日到 2 月 20 日。从那天起,他们都成功了。
我尝试使用cli(而不是使用 Web UI 执行 20 次):
airflow clear -f -t * my_dags.my_dag_id
但我有一个奇怪的错误:
airflow: error: unrecognized arguments: airflow-webserver.pid airflow.cfg airflow_variables.json my_dags.my_dag_id
Run Code Online (Sandbox Code Playgroud)
编辑 1:
就像@tobi6 解释的那样,*确实造成了麻烦。知道了这一点,我尝试了这个命令:
airflow clear -u -d -f -t ".*" my_dags.my_dag_id
Run Code Online (Sandbox Code Playgroud)
但它只返回失败的任务实例(-f标志)。-d和-u标志似乎不起作用,因为下游和上游的任务实例被忽略(不返回)。
编辑2:
就像@tobi6 建议的那样,使用-s并-e允许选择日期范围内的所有 DAG 运行。这是命令:
airflow clear -s "2018-04-01 00:00:00" -e "2018-04-01 00:00:00" my_dags.my_dag_id.
Run Code Online (Sandbox Code Playgroud)
但是,向-f上面的命令添加标志只会返回失败的任务实例。是否可以选择日期范围内所有失败 DAG 运行的所有失败任务实例?
我有一个简单的DAG
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table20180524'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={})
start >> bq_query >> end
Run Code Online (Sandbox Code Playgroud)
执行bq_query任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我仅更改destination_dataset_table为my_dataset.my_table$20180524。执行时出现以下错误bq_task:
Partitioning specification must be provided in order to create partitioned table
Run Code Online (Sandbox Code Playgroud)
如何指定BigQuery将查询结果保存到每日分区表中?我的第一个猜测是使用query_paramsin,BigQueryOperator
但是我没有找到任何有关如何使用该参数的示例。
编辑:
我正在使用google-cloud==0.27.0python客户端...这是Prod中使用的一个客户端:(
我们正在使用Airflow v 1.9.0。我们有100多个dag,实例速度非常慢。调度程序仅启动一些任务。
为了减少CPU使用量,我们想调整一些配置参数,即:min_file_process_interval和dag_dir_list_interval。该文件并没有真正明确了两者的区别
我正在阅读一篇有关由某些Dataflow源和接收器实施的一次精确处理的文章,但在理解BigQuery接收器上的示例时遇到了麻烦。从文章
生成随机UUID是非确定性操作,因此在插入BigQuery之前,我们必须添加一个重新排列。完成此操作后,Cloud Dataflow进行的任何重试将始终使用改组后的相同UUID。重复插入BigQuery的尝试将始终具有相同的插入ID,因此BigQuery可以对其进行过滤
// Apply a unique identifier to each record
c
.apply(new DoFn<> {
@ProcessElement
public void processElement(ProcessContext context) {
String uniqueId = UUID.randomUUID().toString();
context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
new RecordWithId(context.element(), uniqueId)));
}
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
@ProcessElement
public void processElement(ProcessContext context) {
insertIntoBigQuery(context.element().record(), context.element.id());
}
}); …Run Code Online (Sandbox Code Playgroud) 我想使用Airflow来获取SQL查询的结果,该结果被格式化为pandas DataFrame。
def get_my_query(*args, **kwargs)
bq_hook = BigQueryHook(bigquery_conn_id='my_connection_id', delegate_to=None)
my_query = """
SELECT col1, col2
FROM `my_bq_project.my_bq_dataset.my_table`
"""
df = bq_hook.get_pandas_df(bql=my_query, dialect='standard')
logging.info('df.head()\n{}'.format(df.head()))
Run Code Online (Sandbox Code Playgroud)
上面是我要在中执行的python函数PythonOperator。这是DAG:
my_dag = DAG('my_dag',start_date=datetime.today())
start = DummyOperator(task_id='start', dag=my_dag)
end = DummyOperator(task_id='end', dag=my_dag)
work = PythonOperator(task_id='work',python_callable=get_my_query, dag=my_dag)
start >> work >> end
Run Code Online (Sandbox Code Playgroud)
但是,工作步骤引发了异常。这是日志:
[2018-04-02 20:25:50,506] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:50,506] {gcp_api_base_hook.py:82} INFO - Getting connection using a JSON key file.
[2018-04-02 20:25:51,035] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:51,035] {slack_operator.py:70} ERROR - Slack API call failed …Run Code Online (Sandbox Code Playgroud)