是否可以将选项从azkaban工作流程传递到基础工作代码?
我有这样的东西,它适用于硬编码/已知日期,但是我想在执行流程时指定日期:
from azkaban import Job, Project
import datetime
import os
from datetime import datetime, timezone, timedelta
options = {
'start.date' : today.strftime('%Y-%m-%d'), # Can we pass this as an argument to the underlying code?
'day.offset' : 1
}
project = Project('my_project',root=__file__)
project.add_file('my_shell_script.sh', 'my_shell_script.sh')
project.add_job('my_job', Job(options, {'type' : 'command' : 'bash my_shell_script <pass date here?>'}))
project.add_job('my_job', Job(options, {'type' : 'command' : 'java -jar test.jar <pass date here?>'}))
Run Code Online (Sandbox Code Playgroud)
谢谢,Sharath
在Azkaban流中的非相邻作业之间传递参数的一种方法是在需要参数之前对JOB_OUTPUT_PROP_FILE进行操作。必须使用Shell脚本执行此操作,因为JOB_OUTPUT_PROP_FILE变量不可直接用于给定作业。 这种方法将相关信息写入文件,并在需要使用帮助程序脚本之前将其读取。通过在每个步骤写入JOB_OUTPUT_PROP_FILE,可以将参数传递给相邻作业。
在后一个作业需要使用流中第一个作业的运行日期的情况下,首先将相关数据写入文件。在此示例中,当前日期以YYYY-MM-DD格式写入名为rundate.text的本地文件。
#step_1.job
type=command
dependencies=initialize_jobs
command=whoami
command.1=/bin/sh -c "date '+%Y-%m-%d' > rundate.text"
Run Code Online (Sandbox Code Playgroud)
然后,在需要该参数之前,运行一个准备脚本以使该参数可用。
#step_4_preparation.job
type=command
dependencies=step_3
command=whoami
command.1=/bin/bash rd.sh
Run Code Online (Sandbox Code Playgroud)
步骤4准备执行以下Shell脚本(rd.sh)
#!/bin/sh
# this script takes the run_date value from the text file and passes it to Azkaban
# Now, the variable can be used in the next step of the job
RD=$(cat rundate.text)
echo "Now setting Job Output Property File with RD (run date) variable"
echo $RD
#This is the point where the parameter is written in JSON format
#to the JOB_OUTPUT_PROP_FILE, which allows it to be used in the next step
echo '{"RD" : "'"$RD"'"}' > "${JOB_OUTPUT_PROP_FILE}"
Run Code Online (Sandbox Code Playgroud)
然后,在接下来的步骤中,可以使用参数,在此示例中为$ {RD}。
# step_4.job
type=command
dependencies=step_4_preparation
command=whoami
command.1=bash -c "echo ${RD} is the run date"
Run Code Online (Sandbox Code Playgroud)
出色地,
根据 azkaban 文档,唯一的全局流属性可以被覆盖。在Python中,我们可以这样设置全局属性:
project = Project('emr-cluster-creation', root=__file__)
project.properties = {
'emr.cluster.name' : 'default-clustername',
'emr.master.instance.type' : 'r3.xlarge',
'emr.core.instance.type' : 'r3.xlarge',
'emr.task.instance.type' : 'r3.xlarge',
'emr.instance.count' : 11,
'emr.task.instance.count' : 5,
'emr.hadoop.conf.local.path' : 's3n://bucket/hadoop-configuration.json',
'emr.hive.site.s3.path' : 's3n://bucket/hive-site.xml',
'emr.spark.version' : '1.4.0.b',
# 'emr.service.role' : 'some-role', #amazon IAM role.
'emr.service.role' : '\'\'', #amazon IAM role.
'emr.cluster.output' : '\'\''
}
# do something...
Run Code Online (Sandbox Code Playgroud)
这些参数可以作为 ${emr.cluster.name} 传递到底层应用程序/脚本。这将支持传递默认属性值以及在 azkaban 服务器 web-ui 上或使用 azkaban ajax API 覆盖流参数。
| 归档时间: |
|
| 查看次数: |
3848 次 |
| 最近记录: |