des*_*nil 1 apache-spark google-cloud-platform google-cloud-composer
我发现 Google Cloud Composer 是非常有前途的托管 Apache Airflow 服务,但我不知道如何使用 Cloud Composer 通过 PySpark 代码执行管道。我可以安装其他 Python 包(例如 Pandas)并使用 Cloud Composer。
任何指针都非常感激。
Cloud Composer 用于调度管道。
因此,要在 Cloud Composer 中运行 PySpark 代码,您需要创建一个 Dataproc 集群,因为 PySpark 作业在 Dataproc 集群中运行。在 DAG 中,您可以使用DataprocCreateClusterOperator安排创建 Dataproc 集群。创建集群后,您可以使用 DataprocSubmitJobOperator 将 PySpark 作业提交到 Dataproc集群。要将作业提交到集群,您需要提供作业源文件。您可以参考下面的一段代码以供参考。
PySpark代码:
import pyspark
from operator import add
sc = pyspark.SparkContext()
data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x:
(x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
ascending=False).collect()
for (word, count) in counts:
print("{}: {}".format(word, count))
Run Code Online (Sandbox Code Playgroud)
有向无环图代码:
import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago
PROJECT_ID = "give your project id"
CLUSTER_NAME = "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
'start_date': YESTERDAY,
}
# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
}
with models.DAG(
"dataproc",
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
)
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
)
create_cluster >> pyspark_task
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2438 次 |
最近记录: |