我有这个 Python 文件:
class Get:
def __init__(self, i):
self.i = get_date(i)
self.df = self.get_file()
def get_file(self):
try:
...
return df
except Exception as e:
return ...
def get_date(self,i):
dt = datetime.now() - timedelta(days=i)
return dt.strftime("%Y-%m-%d")
def put(self,df):
....
class Fix:
def __init__(self,df):
....
if __name__ == '__main__':
for i in range(4, 0, -1):
get = Get(i)
fix = Fix(get.df)
get.put(fix.df)
Run Code Online (Sandbox Code Playgroud)
基本上这段代码生成 4 个最后的日期并在这些日期上运行函数(更新统计信息等...)
起初我想将每个函数转换为 PythonOperator 然后安排它,但我认为这行不通。我不知道如何转换 Classes 和它们之间传输的参数。
如果我在 2018 年 6 月 12 日及以下运行它,代码会执行以下操作:

是否有我可以使用的模板或任何建议?
小智 5
您可以使用 BashOperator 执行脚本,而无需对脚本进行任何更改:
dag = DAG('{NAME_OF_THE_DAG}', schedule_interval='daily',
default_args=default_args)
t1 = BashOperator(
task_id = '{NAME_OF_TASK}',
dag = dag,
bash_command = python {NAME_OF_THE_FILE_TO_EXECUTE}.py')
Run Code Online (Sandbox Code Playgroud)
或使用 PythonOperator:
更新您的代码以在脚本中创建main函数:
def main():
for i in range(4, 0, -1):
get = Get(i)
fix = Fix(get.df)
get.put(fix.df)
Run Code Online (Sandbox Code Playgroud)定义并执行 dag:
dag = DAG('{NAME_OF_THE_TASK}', schedule_interval = 'daily',
default_args=default_args)
t1 = PythonOperator(
task_id = '{NAME_OF_TASK}',
dag = dag,
python_callable = main)
Run Code Online (Sandbox Code Playgroud)| 归档时间: |
|
| 查看次数: |
2001 次 |
| 最近记录: |