我有两个 Airflow DAG - 调度程序和工作人员。调度程序每分钟运行一次,轮询新的聚合作业并触发辅助作业。您可以在下面找到调度程序作业的代码。
然而,在 6000 多个调度程序作业中,有 30 个运行失败,异常情况如下:
[2019-05-14 11:02:12,382] {models.py:1760} ERROR - (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] [parameters: ('run_query', 'worker', datetime.datetime(2019, 5, 14, 11, 2, 11, tzinfo=<Timezone [UTC]>), None, None, …Run Code Online (Sandbox Code Playgroud) google-cloud-platform airflow airflow-scheduler google-cloud-composer