数据融合中的管道依赖

SUD*_*ARG 1 cdap google-cloud-data-fusion

我在数据融合中有三个管道,比如 A、B 和 C。我希望管道 C 在执行管道 A 和管道 B 都完成后被触发。管道触发器仅依赖于一个管道。这可以在数据融合中实现吗?

小智 5

You can do it using Google Cloud Composer [1]. In order to perform this action first of all you need to create a new Environment in Google Cloud Composer [2], once done, you need to install a new Python Package in your environment [3], and the package that you will need to install is [4] "apache-airflow-backport-providers-google".

With this package installed you will be able to use these operations [5], the one you will need is [6] "Start a DataFusion pipeline", this way you will be able to start a new pipeline from Airflow.

An example of the python code would be as follows:

import airflow
import datetime
from airflow import DAG
from airflow import models
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
from airflow.providers.google.cloud.operators.datafusion import (
    CloudDataFusionStartPipelineOperator
)

default_args = {
   'start_date': airflow.utils.dates.days_ago(0),
   'retries': 1,
   'retry_delay': timedelta(minutes=5)
}

with models.DAG(
    'composer_DF',
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_args) as dag:

    # the operations.
    A = CloudDataFusionStartPipelineOperator(
            location="us-west1", pipeline_name="A", 
            instance_name="instance_name", task_id="start_pipelineA",
        )
    B = CloudDataFusionStartPipelineOperator(
            location="us-west1", pipeline_name="B", 
            instance_name="instance_name", task_id="start_pipelineB",
        )
    C = CloudDataFusionStartPipelineOperator(
            location="us-west1", pipeline_name="C", 
            instance_name="instance_name", task_id="start_pipelineC",
        )
    # First A then B and then C
    A >> B >> C
Run Code Online (Sandbox Code Playgroud)

You can set the time intervals by checking the Airflow documentation.

Once you have this code saved as a .py file, save it to ther Google Cloud Storage DAG folder of your environment.

When the DAG starts, it will execute task A and when it finishes it will execute task B and so on.

[1] https://cloud.google.com/composer

[2] https://cloud.google.com/composer/docs/how-to/managing/creating#:~:text=In%20the%20Cloud%20Console%2C%20open%20the%20Create%20Environment%20page.&text=Under%20Node%20configuration%2C%20click%20Add%20environment%20variable.&text=The%20From%3A%20email%20address%2C%20such,%40%20.&text=Your%20SendGrid%20API%20key.

[3] https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies

[4] https://pypi.org/project/apache-airflow-backport-providers-google/

[5] https://airflow.readthedocs.io/en/latest/_api/airflow/providers/google/cloud/operators/datafusion/index.html

[6] https://airflow.readthedocs.io/en/latest/howto/operator/google/cloud/datafusion.html#start-a-datafusion-pipeline