Thi*_*uda 5 google-cloud-platform google-cloud-dataproc airflow google-cloud-composer
I\xe2\x80\x99m 尝试执行下面的 dag。\n创建 dataproc 集群的操作员似乎无法启用可选组件来启用 jupyter 笔记本和 anaconda。\n我在这里找到了此代码:Component Gateway with DataprocOperator on Airflow尝试解决它,但对我来说它没有解决它,因为我认为这里的作曲家(气流)版本是不同的。我的版本是composer - 2.0.0-preview.5,airflow-2.1.4。
\n该操作员在创建集群时工作得很好,但它没有使用可选组件来创建以启用 Jupyter Notebook。\n有人有任何想法可以帮助我吗?
\nfrom airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor\nfrom airflow import DAG\nfrom datetime import datetime, timedelta\nfrom airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator\nfrom airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator\n\nyesterday = datetime.combine(datetime.today() - timedelta(1),\n datetime.min.time())\n\n\ndefault_args = {\n \'owner\': \'teste3\',\n \'depends_on_past\': False,\n \'start_date\' :yesterday,\n \'email\': [\'airflow@example.com\'],\n \'email_on_failure\': False,\n \'email_on_retry\': False,\n \'retries\': 0,\n \'retry_delay\': timedelta(minutes=5),\n\n}\n\ndag = DAG(\n \'teste-dag-3\',catchup=False, default_args=default_args, schedule_interval=None)\n\n\n# configura os componentes\nclass CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):\n\n def __init__(self, *args, **kwargs):\n super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)\n\n def _build_cluster_data(self):\n cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()\n cluster_data[\'config\'][\'endpointConfig\'] = {\n \'enableHttpPortAccess\': True\n }\n cluster_data[\'config\'][\'softwareConfig\'][\'optionalComponents\'] = [ \'JUPYTER\', \'ANACONDA\' ]\n return cluster_data\n\n\ncreate_cluster=CustomDataprocClusterCreateOperator(\n dag=dag,\n task_id=\'start_cluster_example\',\n cluster_name=\'teste-ge-{{ ds }}\',\n project_id= "sandbox-coe",\n num_workers=2,\n num_masters=1,\n master_machine_type=\'n2-standard-8\',\n worker_machine_type=\'n2-standard-8\',\n worker_disk_size=500,\n master_disk_size=500,\n master_disk_type=\'pd-ssd\',\n worker_disk_type=\'pd-ssd\',\n image_version=\'1.5.56-ubuntu18\',\n tags=[\'allow-dataproc-internal\'],\n region="us-central1",\n zone=\'us-central1-f\',#Variable.get(\'gc_zone\'),\n storage_bucket = "bucket-dataproc-ge",\n labels = {\'product\' : \'sample-label\'},\n service_account_scopes = [\'https://www.googleapis.com/auth/cloud-platform\'],\n #properties={"yarn:yarn.nodemanager.resource.memory-mb" : 15360,"yarn:yarn.scheduler.maximum-allocation-mb" : 15360},\n #subnetwork_uri="projects/project-id/regions/us-central1/subnetworks/dataproc-subnet",\n retries= 1,\n retry_delay=timedelta(minutes=1)\n ) #starts a dataproc cluster\n\n\nstop_cluster_example = DataprocClusterDeleteOperator(\n dag=dag,\n task_id=\'stop_cluster_example\',\n cluster_name=\'teste-ge-{{ ds }}\',\n project_id="sandbox-coe",\n region="us-central1",\n ) #stops a running dataproc cluster\n\n\n\n\ncreate_cluster >> stop_cluster_example\n
Run Code Online (Sandbox Code Playgroud)\n
编辑:
深入研究后,您不再需要自定义运算符。更新后的运算符DataprocCreateClusterOperator
具有enable_component_gateway
,optional_components
因此您可以直接设置它们:
from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator, DataprocCreateClusterOperator
CLUSTER_GENERATOR = ClusterGenerator(
project_id=PROJECT_ID,
region=REGION,
...,
enable_component_gateway=True,
optional_components = [ 'JUPYTER', 'ANACONDA' ]
).make()
DataprocCreateClusterOperator(
...,
cluster_config=CLUSTER_GENERATOR
)
Run Code Online (Sandbox Code Playgroud)
您可以查看此示例 dag以了解更多详细信息。您可以ClusterGenerator
在源代码中查看所有可能的参数。
原始答案:
运算符被重写(参见PR)。我认为问题出在你的_build_cluster_data
功能上。
您可能应该将代码更改为:
def _build_cluster_data(self):
cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
cluster_data['config']['endpoint_config'] = {
'enableHttpPortAccess': True
}
cluster_data['config']['software_config']['optional_components'] = [ 'JUPYTER', 'ANACONDA' ] # redundant see comment 2
return cluster_data
Run Code Online (Sandbox Code Playgroud)
一些注意事项:
CustomDataprocClusterCreateOperator 已弃用。您应该使用DataprocCreateClusterOperator
来自google 提供商的.
您不需要cluster_data['config']['endpoint_config']
通过传递给运算符来直接设置值,optional_components
请参阅源代码。
归档时间: |
|
查看次数: |
902 次 |
最近记录: |