Airflow:触发 DAG 运行时出现重复条目​​ mysql 完整性错误

Paw*_*wel 6 google-cloud-platform airflow airflow-scheduler google-cloud-composer

我有两个 Airflow DAG - 调度程序和工作人员。调度程序每分钟运行一次,轮询新的聚合作业并触发辅助作业。您可以在下面找到调度程序作业的代码。

然而,在 6000 多个调度程序作业中,有 30 个运行失败,异常情况如下:

[2019-05-14 11:02:12,382] {models.py:1760} ERROR - (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] [parameters: ('run_query', 'worker', datetime.datetime(2019, 5, 14, 11, 2, 11, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, '', 'airflow', None, None, 'default', 1, None, None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.')]
Traceback (most recent call last)
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_contex
    context
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execut
    cursor.execute(statement, parameters
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 198, in execut
    res = self._query(query
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 304, in _quer
    db.query(q
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/connections.py", line 217, in quer
    _mysql.connection.query(self, query
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'"
Run Code Online (Sandbox Code Playgroud)

考虑到绝大多数调度程序运行都可以,我倾向于说这是使用手动/外部触发器时 Airflow 中的某种竞争条件。

有人遇到过类似的问题吗?

[2019-05-14 11:02:12,382] {models.py:1760} ERROR - (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] [parameters: ('run_query', 'worker', datetime.datetime(2019, 5, 14, 11, 2, 11, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, '', 'airflow', None, None, 'default', 1, None, None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.')]
Traceback (most recent call last)
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_contex
    context
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execut
    cursor.execute(statement, parameters
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 198, in execut
    res = self._query(query
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 304, in _quer
    db.query(q
  File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/connections.py", line 217, in quer
    _mysql.connection.query(self, query
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'"
Run Code Online (Sandbox Code Playgroud)

LEC*_*LEC 0

看起来这个问题与这个开放的气流问题有关,并且该用户发布了一个修复程序作为此问题的答案,其中涉及更改execution_date