在 Airflow 上使用 DataprocOperator 的组件网关

kwn*_*kwn 5 python google-cloud-platform google-cloud-dataproc airflow

在 GCP 中,从 UI 或 gcloud 命令安装和运行JupyterHub 组件相当简单。我正在尝试通过 Airflow 和DataprocClusterCreateOperator 编写进程脚本,这里是 DAG 的摘录

from airflow.contrib.operators import dataproc_operator  

create_cluster=dataproc_operator.DataprocClusterCreateOperator(
        task_id='create-' + CLUSTER_NAME, 
        cluster_name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        num_workers=3,
        num_masters=1,
        master_machine_type='n1-standard-2',
        worker_machine_type='n1-standard-2',
        master_disk_size=100,
        worker_disk_size=100,
        storage_bucket='test-dataproc-jupyter', 
        region='europe-west4', 
        zone='europe-west4-a',
        auto_delete_ttl=21600, 
        optional_components=['JUPYTER', 'ANACONDA']
    )
Run Code Online (Sandbox Code Playgroud)

但是我无法指定所需的enable-component-gateway参数。查看源代码,似乎参数不是有意的(无论是在已弃用的还是 最后一个稳定的运算符中)。

我知道 REST API 提供了endpointConfig.enableHttpPortAccess,但我更愿意使用官方运营商。有谁知道如何实现这一目标?

Dav*_*itz 7

编辑,一个适合作曲家1.8.3 与气流1.10.3 的修复

在 Airflow 1.10.3 中,无法在外部创建集群配置。但是,我们可以继承集群创建操作符并覆盖配置创建。这也让我们可以设置可选组件,这是 Airflow 版本中缺少的一个参数。

class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

    def __init__(self, *args, **kwargs):
        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

    def _build_cluster_data(self):
        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
        cluster_data['config']['endpointConfig'] = {
            'enableHttpPortAccess': True
        }
        cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
        return cluster_data

#Start DataProc Cluster
dataproc_cluster = CustomDataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME, 
    cluster_name=CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    num_masters=1,
    master_machine_type='n1-standard-2',
    worker_machine_type='n1-standard-2',
    master_disk_size=100,
    worker_disk_size=100,
    storage_bucket='test-dataproc-jupyter', 
    region='europe-west4', 
    zone='europe-west4-a',
    auto_delete_ttl=21600, 
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

原始答案,适用于 Airflow 1.10.7

虽然不是最优的,但您可以自己创建 Cluster 数据结构,而不是让 Airflow 的 ClusterGenerator 来这样做。它应该适用于最新版本(1.10.7)

cluster = {
  'clusterName': CLUSTER_NAME,
  'config': {
    'gceClusterConfig': {
      'zoneUri': 'europe-west4-a'
    },
    'masterConfig': {
      'numInstances': 1,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'workerConfig': {
      'numInstances': 3,
      'machineTypeUri': 'n1-standard-2',
      'diskConfig': {
        'bootDiskSizeGb': 100
      },
    },
    'softwareConfig': {
      'optionalComponents': [
        'ANACONDA',
        'JUPYTER'
      ]
    },
    'lifestyleConfig': {
      'autoDeleteTtl': 21600
    },
    'endpointConfig': {
      'enableHttpPortAccess': True
    }
  },
  'projectId': PROJECT_ID
}
#Start DataProc Cluster
dataproc_cluster = DataprocClusterCreateOperator(
    task_id='create-' + CLUSTER_NAME,
    project_id=PROJECT_ID,
    num_workers=3,
    region='europe-west4', 
    zone='europe-west4-a',
    cluster = cluster,
    dag=DAG
)
Run Code Online (Sandbox Code Playgroud)

如果您使用的是其他 Airflow 版本,请指明。

您也可以为我打开的错误投票:AIRFLOW-6432