dataproc 上的组件网关激活不适用于composer(airflow)操作符airflow.providers.google.cloud.operators.dataproc

Thi*_*uda 5 google-cloud-platform google-cloud-dataproc airflow google-cloud-composer

I\xe2\x80\x99m 尝试执行下面的 dag。\n创建 dataproc 集群的操作员似乎无法启用可选组件来启用 jupyter 笔记本和 anaconda。\n我在这里找到了此代码:Component Gateway with DataprocOperator on Airflow尝试解决它,但对我来说它没有解决它,因为我认为这里的作曲家(气流)版本是不同的。我的版本是composer - 2.0.0-preview.5,airflow-2.1.4。

\n

该操作员在创建集群时工作得很好,但它没有使用可选组件来创建以启用 Jupyter Notebook。\n有人有任何想法可以帮助我吗?

\n
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor\nfrom airflow import DAG\nfrom datetime import datetime, timedelta\nfrom airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\n\nyesterday = datetime.combine(datetime.today() - timedelta(1),\n                             datetime.min.time())\n\n\ndefault_args = {\n    \'owner\': \'teste3\',\n    \'depends_on_past\': False,\n    \'start_date\' :yesterday,\n    \'email\': [\'airflow@example.com\'],\n    \'email_on_failure\': False,\n    \'email_on_retry\': False,\n    \'retries\': 0,\n    \'retry_delay\': timedelta(minutes=5),\n\n}\n\ndag = DAG(\n    \'teste-dag-3\',catchup=False, default_args=default_args, schedule_interval=None)\n\n\n# configura os componentes\nclass CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):\n\n    def __init__(self, *args, **kwargs):\n        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)\n\n    def _build_cluster_data(self):\n        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()\n        cluster_data[\'config\'][\'endpointConfig\'] = {\n            \'enableHttpPortAccess\': True\n        }\n        cluster_data[\'config\'][\'softwareConfig\'][\'optionalComponents\'] = [ \'JUPYTER\', \'ANACONDA\' ]\n        return cluster_data\n\n\ncreate_cluster=CustomDataprocClusterCreateOperator(\n        dag=dag,\n        task_id=\'start_cluster_example\',\n        cluster_name=\'teste-ge-{{ ds }}\',\n        project_id= "sandbox-coe",\n        num_workers=2,\n        num_masters=1,\n        master_machine_type=\'n2-standard-8\',\n        worker_machine_type=\'n2-standard-8\',\n        worker_disk_size=500,\n        master_disk_size=500,\n        master_disk_type=\'pd-ssd\',\n        worker_disk_type=\'pd-ssd\',\n        image_version=\'1.5.56-ubuntu18\',\n        tags=[\'allow-dataproc-internal\'],\n        region="us-central1",\n        zone=\'us-central1-f\',#Variable.get(\'gc_zone\'),\n        storage_bucket = "bucket-dataproc-ge",\n        labels = {\'product\' : \'sample-label\'},\n        service_account_scopes = [\'https://www.googleapis.com/auth/cloud-platform\'],\n        #properties={"yarn:yarn.nodemanager.resource.memory-mb" : 15360,"yarn:yarn.scheduler.maximum-allocation-mb" : 15360},\n        #subnetwork_uri="projects/project-id/regions/us-central1/subnetworks/dataproc-subnet",\n        retries= 1,\n        retry_delay=timedelta(minutes=1)\n    ) #starts a dataproc cluster\n\n\nstop_cluster_example = DataprocClusterDeleteOperator(\n    dag=dag,\n    task_id=\'stop_cluster_example\',\n    cluster_name=\'teste-ge-{{ ds }}\',\n    project_id="sandbox-coe",\n    region="us-central1",\n    ) #stops a running dataproc cluster\n\n\n\n\ncreate_cluster  >> stop_cluster_example\n
Run Code Online (Sandbox Code Playgroud)\n

Ela*_*lad 2

编辑: 深入研究后,您不再需要自定义运算符。更新后的运算符DataprocCreateClusterOperator具有enable_component_gatewayoptional_components因此您可以直接设置它们:

from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator, DataprocCreateClusterOperator

CLUSTER_GENERATOR = ClusterGenerator(
    project_id=PROJECT_ID,
    region=REGION,
    ...,
    enable_component_gateway=True,
    optional_components = [ 'JUPYTER', 'ANACONDA' ]
).make()

DataprocCreateClusterOperator(
    ...,
    cluster_config=CLUSTER_GENERATOR
)
Run Code Online (Sandbox Code Playgroud)

您可以查看此示例 dag以了解更多详细信息。您可以ClusterGenerator源代码中查看所有可能的参数。

原始答案: 运算符被重写(参见PR)。我认为问题出在你的_build_cluster_data功能上。

您可能应该将代码更改为:

def _build_cluster_data(self):
    cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
    cluster_data['config']['endpoint_config'] = {
        'enableHttpPortAccess': True
    }
    cluster_data['config']['software_config']['optional_components'] = [ 'JUPYTER', 'ANACONDA' ] # redundant see comment 2
    return cluster_data
Run Code Online (Sandbox Code Playgroud)

一些注意事项:

  1. CustomDataprocClusterCreateOperator 已弃用。您应该使用DataprocCreateClusterOperator来自google 提供商的.

  2. 您不需要cluster_data['config']['endpoint_config']通过传递给运算符来直接设置值,optional_components请参阅源代码