Azkaban:将参数传递给基础作业代码

sha*_*ath 5 azkaban

是否可以将选项从azkaban工作流程传递到基础工作代码?

我有这样的东西,它适用于硬编码/已知日期,但是我想在执行流程时指定日期:

from azkaban import Job, Project
import datetime
import os
from datetime import datetime, timezone, timedelta




options = {
            'start.date' : today.strftime('%Y-%m-%d'), # Can we pass this as an argument to the underlying code?
            'day.offset' : 1
            }

project = Project('my_project',root=__file__)
project.add_file('my_shell_script.sh', 'my_shell_script.sh')
project.add_job('my_job', Job(options, {'type' : 'command' : 'bash my_shell_script <pass date here?>'}))
project.add_job('my_job', Job(options, {'type' : 'command' : 'java -jar test.jar <pass date here?>'}))
Run Code Online (Sandbox Code Playgroud)

谢谢,Sharath

eea*_*rly 5

大图:通过将参数写入磁盘来使其持久化

在Azkaban流中的非相邻作业之间传递参数的一种方法是在需要参数之前对JOB_OUTPUT_PROP_FILE进行操作。必须使用Shell脚本执行此操作,因为JOB_OUTPUT_PROP_FILE变量不可直接用于给定作业。 这种方法将相关信息写入文件,并在需要使用帮助程序脚本之前将其读取。通过在每个步骤写入JOB_OUTPUT_PROP_FILE,可以将参数传递给相邻作业。

在作业图之间传递参数


概念

  1. 将文件写入磁盘,其中包含您希望传递的信息
  2. 创建一个帮助程序/准备shell脚本,该脚本在需要该参数的作业之前运行
  3. 在工作中使用参数

在后一个作业需要使用流中第一个作业的运行日期的情况下,首先将相关数据写入文件。在此示例中,当前日期以YYYY-MM-DD格式写入名为rundate.text的本地文件。

#step_1.job
type=command
dependencies=initialize_jobs
command=whoami
command.1=/bin/sh -c "date '+%Y-%m-%d' > rundate.text"
Run Code Online (Sandbox Code Playgroud)

然后,在需要该参数之前,运行一个准备脚本以使该参数可用。

#step_4_preparation.job
type=command
dependencies=step_3
command=whoami
command.1=/bin/bash rd.sh
Run Code Online (Sandbox Code Playgroud)

步骤4准备执行以下Shell脚本(rd.sh)

#!/bin/sh
# this script takes the run_date value from the text file and passes it to Azkaban
# Now, the variable can be used in the next step of the job

RD=$(cat rundate.text)

echo "Now setting Job Output Property File with RD (run date) variable"
echo $RD


#This is the point where the parameter is written in JSON format
#to the JOB_OUTPUT_PROP_FILE, which allows it to be used in the next step
echo '{"RD" : "'"$RD"'"}' > "${JOB_OUTPUT_PROP_FILE}"
Run Code Online (Sandbox Code Playgroud)

然后,在接下来的步骤中,可以使用参数,在此示例中为$ {RD}。

# step_4.job
type=command
dependencies=step_4_preparation
command=whoami
command.1=bash -c "echo ${RD} is the run date"
Run Code Online (Sandbox Code Playgroud)


sha*_*ath 2

出色地,

根据 azkaban 文档,唯一的全局流属性可以被覆盖。在Python中,我们可以这样设置全局属性:

    project = Project('emr-cluster-creation', root=__file__)

project.properties = {
                                                    'emr.cluster.name' : 'default-clustername',
                                                    'emr.master.instance.type' : 'r3.xlarge',
                                                    'emr.core.instance.type' : 'r3.xlarge',
                                                    'emr.task.instance.type' : 'r3.xlarge',
                                                    'emr.instance.count' : 11,
                                                    'emr.task.instance.count' : 5,
                                                    'emr.hadoop.conf.local.path' : 's3n://bucket/hadoop-configuration.json',
                                                    'emr.hive.site.s3.path' : 's3n://bucket/hive-site.xml',
                                                    'emr.spark.version' : '1.4.0.b',
#                                                   'emr.service.role' : 'some-role', #amazon IAM role.
                                                    'emr.service.role' : '\'\'', #amazon IAM role.
                                                    'emr.cluster.output' : '\'\''
    }

# do something...
Run Code Online (Sandbox Code Playgroud)

这些参数可以作为 ${emr.cluster.name} 传递到底层应用程序/脚本。这将支持传递默认属性值以及在 azkaban 服务器 web-ui 上或使用 azkaban ajax API 覆盖流参数。