将参数传递给Airflow中的相关任务的方法是什么?我有很多bashes文件,我正在尝试将此方法迁移到气流,但我不知道如何在任务之间传递一些属性.
这是一个真实的例子:
#sqoop bash template
sqoop_template = """
sqoop job --exec {{params.job}} -- --target-dir {{params.dir}} --outdir /src/
"""
s3_template = """
s3-dist-cp --src= {{params.dir}} "--dest={{params.s3}}
"""
#Task of extraction in EMR
t1 = BashOperator(
task_id='extract_account',
bash_command=sqoop_template,
params={'job': 'job', 'dir': 'hdfs:///account/' + time.now().strftime("%Y-%m-%d-%H-%M-%S")},
dag=dag)
#Task to upload in s3 backup.
t2 = BashOperator(
task_id='s3_upload',
bash_command=s3_template,
params={}, #here i need the dir name created in t1
depends_on_past=True
)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
在t2中,我需要访问在t1中创建的目录名称.
#Execute a valid job sqoop
def sqoop_import(table_name, job_name):
s3, hdfs = …Run Code Online (Sandbox Code Playgroud) 大家下午好.
我有一个关于交易的问题直到春天和Grails拥有我有多个工作运行相同的服务.
我有一个Grails的默认配置服务和一个每5秒运行一次的方法.通过石英作业,但该方法的执行时间超过5秒.要完成的.我的问题是什么时候石英调用这个方法它们是在同一个事务中执行还是每次发生这个时创建一个新的?我知道每次调用quartz都会创建一个新线程,但是不知道它们是否在同一个事务中,例如,如果调用它会给出一个例外来回滚到目前为止所做的所有事情.
我很感激答案.