将 Python 项目提交到 Dataproc 作业

Gal*_*ses 7 python pyspark google-cloud-dataproc

我有一个 python 项目,其文件夹具有以下结构

main_directory - lib - lib.py
               - run - script.py
Run Code Online (Sandbox Code Playgroud)

script.py

from lib.lib import add_two
spark = SparkSession \
    .builder \
    .master('yarn') \
    .appName('script') \
    .getOrCreate()

print(add_two(1,2))
Run Code Online (Sandbox Code Playgroud)

并且lib.py

def add_two(x,y):
    return x+y
Run Code Online (Sandbox Code Playgroud)

我想在 GCP 中启动 Dataproc 作业。我在网上查过,但不太明白该怎么做。我正在尝试启动脚本

main_directory - lib - lib.py
               - run - script.py
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误消息:

from lib.lib import add_two
ModuleNotFoundError: No module named 'lib.lib'
Run Code Online (Sandbox Code Playgroud)

您能否帮助我了解如何在 Dataproc 上启动该作业?我发现做到这一点的唯一方法是删除绝对路径,将其更改为script.py

 from lib import add_two
Run Code Online (Sandbox Code Playgroud)

并将工作启动为

from lib.lib import add_two
spark = SparkSession \
    .builder \
    .master('yarn') \
    .appName('script') \
    .getOrCreate()

print(add_two(1,2))
Run Code Online (Sandbox Code Playgroud)

但是,我想避免每次手动列出文件的繁琐过程。

按照@Igor的建议,打包成zip文件,我发现

zip -j --update -r libpack.zip /projectfolder/* && spark-submit --py-files libpack.zip /projectfolder/run/script.py
Run Code Online (Sandbox Code Playgroud)

作品。但是,这会将所有文件放在 libpack.zip 中的同一根文件夹中,因此如果子文件夹中存在同名文件,则此操作将不起作用。

有什么建议么?

gag*_*gan 5

压缩依赖项 -

cd base-path-to-python-modules
zip -qr deps.zip ./* -x script.py
Run Code Online (Sandbox Code Playgroud)

将 deps.zip 复制到 hdfs/gs。提交作业时使用 uri,如下所示。

使用 Dataproc 的 Python 连接器提交 python 项目 (pyspark)

from google.cloud import dataproc_v1
from google.cloud.dataproc_v1.gapic.transports import (
    job_controller_grpc_transport)

region = <cluster region>
cluster_name = <your cluster name>
project_id = <gcp-project-id>

job_transport = (
    job_controller_grpc_transport.JobControllerGrpcTransport(
        address='{}-dataproc.googleapis.com:443'.format(region)))
dataproc_job_client = dataproc_v1.JobControllerClient(job_transport)

job_file = <gs://bucket/path/to/main.py or hdfs://file/path/to/main/job.py>

# command line for the main job file
args = ['args1', 'arg2']

# required only if main python job file has imports from other modules
# can be one of .py, .zip, or .egg. 
addtional_python_files = ['hdfs://path/to/deps.zip', 'gs://path/to/moredeps.zip']

job_details = {
    'placement': {
        'cluster_name': cluster_name
    },
    'pyspark_job': {
        'main_python_file_uri': job_file,
        'args': args,
        'python_file_uris': addtional_python_files
    }
}

res = dataproc_job_client.submit_job(project_id=project_id,
                                     region=region, 
                                     job=job_details)
job_id = res.reference.job_id

print(f'Submitted dataproc job id: {job_id}')
Run Code Online (Sandbox Code Playgroud)


Igo*_*hak 1

如果您想在提交 Dataroc 作业时保留项目结构,那么您应该将项目打包到一个文件中,并在提交作业时.zip在参数中指定它:--py-files

gcloud dataproc jobs submit pyspark --cluster=$CLUSTER_NAME --region=$REGION \
  --py-files lib.zip \
  run/script.py
Run Code Online (Sandbox Code Playgroud)

要创建 zip 存档,您需要运行脚本:

cd main_directory/
zip -x run/script.py -r libs.zip .
Run Code Online (Sandbox Code Playgroud)

请参阅此博客文章,了解有关如何将 PySpark 作业的依赖项打包到 zip 存档中的更多详细信息。