从笔记本运行 databricks 作业

Joe*_*Joe 8 jobs scala python-3.x databricks aws-databricks

我想知道是否可以使用代码从笔记本运行 Databricks 作业,以及如何执行

我有一个包含多个任务和许多贡献者的作业,并且我们创建了一个作业来执行这一切,现在我们希望从笔记本运行该作业来测试新功能,而无需在作业中创建新任务,也可以运行循环执行多次作业,例如:

for i in [1,2,3]:
    run job with parameter i
Run Code Online (Sandbox Code Playgroud)

问候

Geo*_*los 4

你需要做的是:

  1. 安装 databricksapi。%pip install databricksapi==1.8.1

  2. 创建您的作业并返回输出。您可以通过退出笔记本来做到这一点,如下所示:

    import json from databricksapi import Workspace, Jobs, DBFS dbutils.notebook.exit(json.dumps({"result": f"{_result}"}))

如果你想传递一个数据帧,你也必须将它们作为 json 转储传递,databricks 有一些关于这方面的官方文档。一探究竟。

  1. 获取稍后需要的工作 ID。您可以从 databricks 中的职位详细信息中获取它。

  2. 在执行者笔记本中,您可以使用以下代码。

     def run_ks_job_and_return_output(params):
       context = json.loads(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())
         # context
       url = context['extraContext']['api_url']
       token = context['extraContext']['api_token']
    
       jobs_instance = Jobs.Jobs(url, token) # initialize a jobs_instance
       runs_job_id = jobs_instance.runJob(****************, 'notebook',
                          params) # **** is the job id
    
       run_is_not_completed = True
       while run_is_not_completed:
         current_run = [run for run in jobs_instance.runsList('completed')['runs'] if run['run_id'] == runs_job_id['run_id'] and run['number_in_job'] == runs_job_id['number_in_job']]
         if len(current_run) == 0:
           time.sleep(30)
         else:
           run_is_not_completed = False
           current_run = current_run[0]
           print( f"Result state:   {current_run['state']['result_state']}, You can check the resulted output in the following link: {current_run['run_page_url']}")
           note_output = jobs_instance.runsGetOutput(runs_job_id['run_id'])['notebook_output']
           return note_output
    
     run_ks_job_and_return_output( { 'parm1' : 'george',
                                        'variable': "values1"})
    
    Run Code Online (Sandbox Code Playgroud)

如果您想并行运行该作业多次,可以执行以下操作。(首先确保您已在作业设置中增加了最大并发运行数)

from multiprocessing.pool import ThreadPool 
pool = ThreadPool(1000) 
results = pool.map(lambda j: run_ks_job_and_return_output( { 'table' : 'george',
                                   'variable': "values1",
                                         'j': j}), 
         [str(x) for x in range(2,len(snapshots_list))])
Run Code Online (Sandbox Code Playgroud)

还可以保存整个 html 输出,但也许您对此不感兴趣。无论如何,我都会在 StackOverflow 上的另一篇文章中回答这个问题。

希望能帮助到你。

  • 这也适用于 azure databricks 吗? (2认同)