Víc*_*lón 0 python hadoop-yarn anaconda apache-spark pyspark
Python : 3.7.3
OS: CentOS 7
Spark: 2.2.0
In Cloudera
YARN : 2.6.0-cdh5.10.2
Run Code Online (Sandbox Code Playgroud)
嗨,我尝试使用带有pyspark 的python脚本执行Apache Spark,但我不明白它是如何工作的。我尝试在客户端模式下使用yarn发送整个conda 环境,并在执行. 但问题是,主要的 python 脚本在哪里运行,因为我需要指定我的共享 conda 环境的位置才能执行而不会出错,因为在我尝试执行的主机中我没有安装依赖项,我不想要安装它。--archivesspark-submitspark-submit
我使用此功能打包环境https://conda.github.io/conda-pack/spark.html,我需要导入地图外的依赖项(因为在地图内,纱线运送依赖项和执行者很好地导入了这个依赖项)。
有没有一种方法可以在不打开和在主机上使用的情况下使用附带的环境执行主 python 脚本?
我的环境是:
PYSPARK_DRIVER_PYTHON=./enviroment/bin/python
PYSPARK_PYTHON=./enviroment/bin/python
Run Code Online (Sandbox Code Playgroud)
其中环境是yarn附带的依赖项的符号链接
--archives ~/dependencies.tar.gz#enviroment
Run Code Online (Sandbox Code Playgroud)
并配置执行器
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
Run Code Online (Sandbox Code Playgroud)
所以最后的命令是
PYSPARK_DRIVER_PYTHON=./enviroment/bin/python \
PYSPARK_PYTHON=./environment/bin/python \
spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn --deploy-mode client \
--archives enviroment/dependencies.tar.gz#enviroment \
cluster-import-check.py
Run Code Online (Sandbox Code Playgroud)
我的代码是
# coding=utf8
from pyspark import SparkConf
from pyspark import SparkContext
import sys
import numpy
def check_import(version, info=None):
    print("=====VERSION : ", version)
    if info and type(info) == list and len(info) != 0:
        for i in info:
            print("=====INFO EXTRA : ", i)
def python_import(x):
    import sys
    print("\n===PYTHON")
    check_import(sys.version, [sys.executable])
def numpy_import(x):
    import numpy
    print("\n===NUMPY")
    check_import(numpy.__version__, [numpy.__file__])
def printInfo(object):
    print("=====NAME : ", object.__name__)
    if object.__name__ == 'sys':
        print("=====VERSION", object.version)
        print("=====LOCATED IN", object.executable)
    else:
        print("=====VERSION : ", object.__version__)
        print("=====LOCATED IN : ", object.__file__)
    if object.__name__ == 'elasticsearch':
        es = elasticsearch.Elasticsearch(['172.22.248.206:9201'])
        print("=====MORE INFO : ", es.info())
def init_spark():
    conf = SparkConf()
    conf.setAppName("imports-checking")
    sc = SparkContext(conf=conf).getOrCreate()
    return conf, sc
def main():
    conf, sc = init_spark()
    print(sc.getConf().getAll())
    print(sc.parallelize([0]).map(lambda x: python_import(x)).collect())
    sc.stop()
if __name__ == '__main__':
    printInfo(sys)
    printInfo(numpy)
    main()
Run Code Online (Sandbox Code Playgroud)
一个错误是no module named numpy或者定位的python是其他的,因为在集群中有另一个版本的python,但我想在集群上使用yarn提供的整个环境。
最近了解了 PysPark with Yarn 的工作流程,答案是如果你想在客户端模式下运行你需要安装(在你执行 spark-submit 的主机上)所有导入到函数映射之外的库。另一方面,如果你想在集群模式下运行,你只需要在 spark-submit 命令中使用 --archives 选项来发送库。
当它被执行时,它是在本地完成的,必须在 spark-submit 的执行中配置 PYSPARK_DRIVER_PYTHON
PYSPARK_DRIVER_PYTHON=./dependencies/bin/python spark-submit --master local --py-files cognition.zip MainScript.py
Run Code Online (Sandbox Code Playgroud)
执行在执行 spark-submit 命令的主机上进行。必须添加环境变量
PYSPARK_DRIVER_PYTHON=./dependencies/bin/python spark-submit --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./dependencies/bin/python --master yarn --deploy-mode client --archives dependencies.tar.gz#dependencies MainScript.py
Run Code Online (Sandbox Code Playgroud)
执行在它创建的yarn容器内部进行,并且在集群的任何节点中执行
spark-submit --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./dependencies/bin/python --master yarn --deploy-mode cluster --archives dependencies.tar.gz#dependencies MainScript.py
Run Code Online (Sandbox Code Playgroud)
        |   归档时间:  |  
           
  |  
        
|   查看次数:  |  
           1301 次  |  
        
|   最近记录:  |