Paw*_*wel 6 google-cloud-platform airflow airflow-scheduler google-cloud-composer
我有两个 Airflow DAG - 调度程序和工作人员。调度程序每分钟运行一次,轮询新的聚合作业并触发辅助作业。您可以在下面找到调度程序作业的代码。
然而,在 6000 多个调度程序作业中,有 30 个运行失败,异常情况如下:
[2019-05-14 11:02:12,382] {models.py:1760} ERROR - (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] [parameters: ('run_query', 'worker', datetime.datetime(2019, 5, 14, 11, 2, 11, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, '', 'airflow', None, None, 'default', 1, None, None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.')]
Traceback (most recent call last)
File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_contex
context
File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execut
cursor.execute(statement, parameters
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 198, in execut
res = self._query(query
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 304, in _quer
db.query(q
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/connections.py", line 217, in quer
_mysql.connection.query(self, query
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'"
Run Code Online (Sandbox Code Playgroud)
考虑到绝大多数调度程序运行都可以,我倾向于说这是使用手动/外部触发器时 Airflow 中的某种竞争条件。
有人遇到过类似的问题吗?
[2019-05-14 11:02:12,382] {models.py:1760} ERROR - (MySQLdb._exceptions.IntegrityError) (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'") [SQL: 'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] [parameters: ('run_query', 'worker', datetime.datetime(2019, 5, 14, 11, 2, 11, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, '', 'airflow', None, None, 'default', 1, None, None, None, b'\x80\x04\x95\x03\x00\x00\x00\x00\x00\x00\x00}\x94.')]
Traceback (most recent call last)
File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1182, in _execute_contex
context
File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 470, in do_execut
cursor.execute(statement, parameters
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 198, in execut
res = self._query(query
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/cursors.py", line 304, in _quer
db.query(q
File "/opt/python3.6/lib/python3.6/site-packages/MySQLdb/connections.py", line 217, in quer
_mysql.connection.query(self, query
MySQLdb._exceptions.IntegrityError: (1062, "Duplicate entry 'run_query-worker-2019-05-14 11:02:11.000000' for key 'PRIMARY'"
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1944 次 |
| 最近记录: |