Nic*_*ick 5 apache-spark airflow
火花和气流的新手,试图了解如何使用气流启动作业以及该作业所需的参数。我使用以下spark-submit命令在边缘节点中针对特定日期运行特定作业,如下所示,
EXECUTORS_MEM=4G
EXECUTORS_NUM=300
STARTDAY=20180401
ENDDAY=20180401
QUEUE=m
jobname=x
/home/spark/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --verbose --master yarn --deploy-mode client $EXECUTORS_NUM --executor-memory $EXECUTORS_MEM --executor-cores 1 --driver-memory 8G --queue $QUEUE --class test.core.Driver --jars $JARS2 abc.jar --config=/a/b/c/test.config --appName=abc --sparkMaster=yarnclient --job=$jobname --days=$STARTDAY,$ENDDAY
Run Code Online (Sandbox Code Playgroud)
所以,您能否让我知道是否创建了.py类似于下面的代码来在气流中运行作业?这是您应该如何运行作业并传递参数的方法吗?
如何像在边缘节点中启动作业那样传递参数?
如果我将作业自动化以每天运行,我希望开始日期为“ t-7”,因此,如果今天的日期为2018年4月20日,则该工作的开始日期必须为2018年4月13日。我该如何实现?
###############.py file example ##############
**********************************************
import BashOperator
import os
import sys
os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
import os
import sys
os.environ['SPARK_HOME'] = '/home/spark/spark-2.1.0-bin-hadoop2.6/bin/'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
and add operator:
spark_task = BashOperator(
task_id='spark_java',
bash_command='spark-submit --class test.core.Driver abc.jar',
params={'EXECUTORS_MEM': '4G', 'EXECUTORS_NUM': '300', 'QUEUE' :'m' , 'jobname' : 'x'},
dag=dag
)
################### EOF ######################
**********************************************
Run Code Online (Sandbox Code Playgroud)
是否可以手动传递某些开始和结束日期以使作业运行?
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezone
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}
dag = DAG('test_dag_id', default_args=args)
_config = {
'config' : '/a/b/c/d/prod.config'
'master' : 'yarn'
'deploy-mode' : 'client'
'sparkMaster' : 'yarnclient'
'class' : 'core.Driver'
'driver_classpath': 'parquet.jar',
'jars': '/a/b/c/d/test.jar',
'total_executor_cores': 4,
'executor_cores': 1,
'EXECUTORS_MEM': '8G',
'EXECUTORS_NUM': 500,
'executor-cores' : '1',
'driver-memory' : '8G',
'JOB_NAME' : ' ',
'QUEUE' : ' ',
'verbose' : ' '
'start_date' : ' '
'end_date' : ' '
]
}
operator = SparkSubmitOperator(
task_id='spark_submit_job',
dag=dag,
**_config
)
Run Code Online (Sandbox Code Playgroud)开始日期是您设置一次的内容,它旨在绝对设置,而不是相对于当前日期。
像这样:
from airflow import DAG
dag = DAG(
...
start_date=datetime.datetime(2018, 4, 13),
)
Run Code Online (Sandbox Code Playgroud)
可以将开始日期设置为 delta 之类的datetime.timedelta(days=7),但不建议这样做,因为如果您要删除 DAG(包括所有引用,例如 DAG 运行、任务实例等)并从在另一天划伤。最佳实践是 DAG 是幂等的。
为了向 Spark 提交作业,有一个SparkSubmitOperator包装spark-submitshell 命令。那将是首选。也就是说,你基本上可以用 a 做任何事情BashOperator,所以这也是一个可行的选择。
SparkSubmitOperator 的链接代码对于它接受的每个参数都有详细记录。您可以使用applicationkwarg指向您的 .jar 文件,使用 .jar传递 Spark 配置conf。还有用于传递信息的 kwargs,如执行程序内核和内存。您可以使用application_args将任意参数列表传递给您的 Spark 应用程序。
这是SparkSubmitOperator在 Airflow中使用复制和稍微简化的单元测试的示例。请注意,它用于**从 dict 中分解 kwargs 以初始化 Spark 运算符,但这就是测试的结构方式。您可以像传递 kwarg 一样轻松地传递每个配置值。
from airflow import DAG
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.utils import timezone
DEFAULT_DATE = timezone.datetime(2017, 1, 1)
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE
}
dag = DAG('test_dag_id', default_args=args)
_config = {
'conf': {
'parquet.compression': 'SNAPPY'
},
'files': 'hive-site.xml',
'py_files': 'sample_library.py',
'driver_classpath': 'parquet.jar',
'jars': 'parquet.jar',
'packages': 'com.databricks:spark-avro_2.11:3.2.0',
'exclude_packages': 'org.bad.dependency:1.0.0',
'repositories': 'http://myrepo.org',
'total_executor_cores': 4,
'executor_cores': 4,
'executor_memory': '22g',
'keytab': 'privileged_user.keytab',
'principal': 'user/spark@airflow.org',
'name': '{{ task_instance.task_id }}',
'num_executors': 10,
'verbose': True,
'application': 'test_application.py',
'driver_memory': '3g',
'java_class': 'com.foo.bar.AppMain',
'application_args': [
'-f', 'foo',
'--bar', 'bar',
'--start', '{{ macros.ds_add(ds, -1)}}',
'--end', '{{ ds }}',
'--with-spaces', 'args should keep embdedded spaces',
]
}
operator = SparkSubmitOperator(
task_id='spark_submit_job',
dag=dag,
**_config
)
Run Code Online (Sandbox Code Playgroud)
来源:https : //github.com/apache/incubator-airflow/blob/f520990fe0b7a70f80bec68cb5c3f0d41e3e984d/tests/contrib/operators/test_spark_submit_operator.py
| 归档时间: |
|
| 查看次数: |
4217 次 |
| 最近记录: |