我是Airflow和Spark的新手,并且正与SparkSubmitOperator苦苦挣扎。
我们的气流调度程序和我们的hadoop集群不在同一台机器上设置(第一个问题:这是一个好习惯吗?)。
我们有许多自动过程需要调用pyspark脚本。这些pyspark脚本存储在hadoop集群(10.70.1.35)中。气流开关存储在气流机(10.70.1.22)中。
当前,当我们想通过气流火花提交pyspark脚本时,我们使用一个简单的BashOperator,如下所示:
cmd = "ssh hadoop@10.70.1.35 spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 2g \
--executor-cores 2 \
/home/hadoop/pyspark_script/script.py"
t = BashOperator(task_id='Spark_datamodel',bash_command=cmd,dag=dag)
Run Code Online (Sandbox Code Playgroud)
它工作得很好。但是我们想开始使用SparkSubmitOperator来提交pyspark脚本。
我尝试了这个:
from airflow import DAG
from datetime import timedelta, datetime
from airflow.contrib.operators.spark_submit_operator import
SparkSubmitOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
dag = DAG('SPARK_SUBMIT_TEST',start_date=datetime(2018,12,10),
schedule_interval='@daily')
sleep = BashOperator(task_id='sleep', bash_command='sleep 10',dag=dag)
_config ={'application':'hadoop@10.70.1.35:/home/hadoop/pyspark_script/test_spark_submit.py',
'master' : 'yarn',
'deploy-mode' : 'cluster',
'executor_cores': 1,
'EXECUTORS_MEM': '2G'
}
spark_submit_operator = SparkSubmitOperator( …Run Code Online (Sandbox Code Playgroud)