小编Mas*_*syB的帖子

为什么任务在 trigger_dag 之后在 Airflow 1.10.2 中停留在 None 状态

我有一个虚拟 DAG,我想通过将其设置start_datetoday并让其调度间隔为daily

这是 DAG 代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- airflow: DAG -*-
import logging

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

logger = logging.getLogger("DummyDAG")


def execute_python_function():
    logging.info("HEY YO !!!")
    return True


dag = DAG(dag_id='dummy_dag',
          start_date=datetime.today())

start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

py_operator = PythonOperator(task_id='exec_function',
                             python_callable=execute_python_function,
                             dag=dag)

start >> py_operator >> end
Run Code Online (Sandbox Code Playgroud)

在 Airflow 1.9.0 中,当我airflow trigger_dag -e 20190701 …

python-2.7 airflow

13
推荐指数
1
解决办法
1万
查看次数

如何在气流中使用 CLI 清除失败的 DAG

我有一些失败的 DAG,比方说从 1 月 1 日到 2 月 20 日。从那天起,他们都成功了。

我尝试使用cli(而不是使用 Web UI 执行 20 次):

airflow clear -f -t * my_dags.my_dag_id

但我有一个奇怪的错误:

airflow: error: unrecognized arguments: airflow-webserver.pid airflow.cfg airflow_variables.json my_dags.my_dag_id
Run Code Online (Sandbox Code Playgroud)

编辑 1:

就像@tobi6 解释的那样,*确实造成了麻烦。知道了这一点,我尝试了这个命令:

airflow clear -u -d -f -t ".*" my_dags.my_dag_id 
Run Code Online (Sandbox Code Playgroud)

但它只返回失败的任务实例(-f标志)。-d-u标志似乎不起作用,因为下游和上游的任务实例被忽略(不返回)。

编辑2:

就像@tobi6 建议的那样,使用-s-e允许选择日期范围内的所有 DAG 运行。这是命令:

airflow clear  -s "2018-04-01 00:00:00" -e "2018-04-01 00:00:00"  my_dags.my_dag_id.
Run Code Online (Sandbox Code Playgroud)

但是,向-f上面的命令添加标志只会返回失败的任务实例。是否可以选择日期范围内所有失败 DAG 运行的所有失败任务实例?

airflow

6
推荐指数
1
解决办法
4806
查看次数

Airflow BigQueryOperator:如何将查询结果保存在分区表中?

我有一个简单的DAG

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(dag_id='my_dags.my_dag') as dag:

    start = DummyOperator(task_id='start')

    end = DummyOperator(task_id='end')
    sql = """
             SELECT *
             FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                            destination_dataset_table='my_dataset.my_table20180524'),
                            task_id='bq_query',
                            bigquery_conn_id='my_bq_connection',
                            use_legacy_sql=False,
                            write_disposition='WRITE_TRUNCATE',
                            create_disposition='CREATE_IF_NEEDED',
                            query_params={})
    start >> bq_query >> end
Run Code Online (Sandbox Code Playgroud)

执行bq_query任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我仅更改destination_dataset_tablemy_dataset.my_table$20180524。执行时出现以下错误bq_task

Partitioning specification must be provided in order to create partitioned table
Run Code Online (Sandbox Code Playgroud)

如何指定BigQuery将查询结果保存到每日分区表中?我的第一个猜测是使用query_paramsin,BigQueryOperator 但是我没有找到任何有关如何使用该参数的示例。

编辑:

我正在使用google-cloud==0.27.0python客户端...这是Prod中使用的一个客户端:(

google-bigquery airflow

5
推荐指数
1
解决办法
3529
查看次数

Apache Airflow 1.9.0中的min_file_process_interval和dag_dir_list_interval有什么区别?

我们正在使用Airflow v 1.9.0。我们有100多个dag,实例速度非常慢。调度程序仅启动一些任务。

为了减少CPU使用量,我们想调整一些配置参数,即:min_file_process_intervaldag_dir_list_interval。该文件并没有真正明确了两者的区别

airflow

5
推荐指数
1
解决办法
2788
查看次数

在BigQuery接收器中仅执行一次处理的情况下,改组是什么意思?

我正在阅读一篇有关由某些Dataflow源和接收器实施的一次精确处理的文章,但在理解BigQuery接收器上的示例时遇到了麻烦。从文章

生成随机UUID是非确定性操作,因此在插入BigQuery之前,我们必须添加一个重新排列。完成此操作后,Cloud Dataflow进行的任何重试将始终使用改组后的相同UUID。重复插入BigQuery的尝试将始终具有相同的插入ID,因此BigQuery可以对其进行过滤

// Apply a unique identifier to each record
c
 .apply(new DoFn<> {
  @ProcessElement
  public void processElement(ProcessContext context) {
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 }
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
   @ProcessElement
   public void processElement(ProcessContext context) {
     insertIntoBigQuery(context.element().record(), context.element.id());
   }
 }); …
Run Code Online (Sandbox Code Playgroud)

dataflow google-bigquery apache-beam

5
推荐指数
2
解决办法
174
查看次数

如何从Airflow中的Big Query获取SQL查询的结果?

我想使用Airflow来获取SQL查询的结果,该结果被格式化为pandas DataFrame。

def get_my_query(*args, **kwargs)
    bq_hook = BigQueryHook(bigquery_conn_id='my_connection_id', delegate_to=None)
    my_query = """ 
                 SELECT col1, col2
                 FROM `my_bq_project.my_bq_dataset.my_table`
                """
    df = bq_hook.get_pandas_df(bql=my_query, dialect='standard')
    logging.info('df.head()\n{}'.format(df.head()))
Run Code Online (Sandbox Code Playgroud)

上面是我要在中执行的python函数PythonOperator。这是DAG:

my_dag = DAG('my_dag',start_date=datetime.today())
start = DummyOperator(task_id='start', dag=my_dag)
end = DummyOperator(task_id='end', dag=my_dag)
work = PythonOperator(task_id='work',python_callable=get_my_query, dag=my_dag)
start >> work >> end
Run Code Online (Sandbox Code Playgroud)

但是,工作步骤引发了异常。这是日志:

[2018-04-02 20:25:50,506] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:50,506] {gcp_api_base_hook.py:82} INFO - Getting connection using a JSON key file.
[2018-04-02 20:25:51,035] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:51,035] {slack_operator.py:70} ERROR - Slack API call failed …
Run Code Online (Sandbox Code Playgroud)

python pandas google-bigquery airflow

4
推荐指数
1
解决办法
2186
查看次数