Apache Airflow 损坏的迁移导致无法运行独立模式

Bar*_*jda 7 python airflow

我已使用 Pip 在本地安装了 Apache Airflow,并按照Docs中的说明以独立模式运行。

问题

重新启动后,尝试再次运行airflow standalone,出现此错误:

standalone | Starting Airflow Standalone
standalone | Checking database is initialized
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
Traceback (most recent call last):
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlite3.OperationalError: duplicate column name: max_tries

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/root/miniconda3/envs/localspark/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/standalone_command.py", line 48, in entrypoint
    StandaloneCommand().run()
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/standalone_command.py", line 64, in run
    self.initialize_database()
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/standalone_command.py", line 170, in initialize_database
    db.initdb()
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 591, in initdb
    upgradedb(session=session)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 824, in upgradedb
    command.upgrade(config, 'heads')
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/command.py", line 298, in upgrade
    script.run_env()
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/script/base.py", line 489, in run_env
    util.load_python_file(self.dir, "env.py")
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/util/pyfiles.py", line 98, in load_python_file
    module = load_module_py(module_id, path)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/util/compat.py", line 184, in load_module_py
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/env.py", line 107, in <module>
    run_migrations_online()
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/env.py", line 101, in run_migrations_online
    context.run_migrations()
  File "<string>", line 8, in run_migrations
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/runtime/environment.py", line 846, in run_migrations
    self.get_context().run_migrations(**kw)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/runtime/migration.py", line 518, in run_migrations
    step.migration_fn(**kw)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py", line 60, in upgrade
    op.add_column('task_instance', sa.Column('max_tries', sa.Integer, server_default="-1"))
  File "<string>", line 8, in add_column
  File "<string>", line 3, in add_column
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/operations/ops.py", line 1927, in add_column
    return operations.invoke(op)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/operations/base.py", line 374, in invoke
    return fn(self, operation)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/operations/toimpl.py", line 132, in add_column
    operations.impl.add_column(table_name, column, schema=schema, **kw)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/ddl/impl.py", line 237, in add_column
    self._exec(base.AddColumn(table_name, column, schema=schema))
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/ddl/impl.py", line 140, in _exec
    return conn.execute(construct, *multiparams, **params)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
    return connection._execute_ddl(self, multiparams, params)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1068, in _execute_ddl
    ret = self._execute_context(
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) duplicate column name: max_tries
[SQL: ALTER TABLE task_instance ADD COLUMN max_tries INTEGER DEFAULT '-1']
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Run Code Online (Sandbox Code Playgroud)

当尝试使用重置数据库时airflow db reset,它返回:

airflow db reset
DB: sqlite:////root/airflow/airflow.db
This will drop existing tables if they exist. Proceed? (y/n)y
[2021-10-18 15:17:43,572] {db.py:831} INFO - Dropping tables that exist
[2021-10-18 15:17:43,704] {migration.py:154} INFO - Context impl SQLiteImpl.
[2021-10-18 15:17:43,705] {migration.py:157} INFO - Will assume non-transactional DDL.
[2021-10-18 15:17:43,800] {db.py:823} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> e3a246e0dc1, current schema
INFO  [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/ddl/sqlite.py:40 UserWarning: Skipping unsupported ALTER for creation of implicit constraintPlease refer to the batch mode feature which allows for SQLite migrations using a copy-and-move strategy.
INFO  [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO  [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO  [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO  [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO  [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO  [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO  [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO  [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO  [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO  [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO  [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO  [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO  [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO  [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO  [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO  [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO  [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO  [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO  [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO  [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO  [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
Traceback (most recent call last):
  File "/root/miniconda3/envs/localspark/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/db_command.py", line 39, in resetdb
    db.resetdb()
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 839, in resetdb
    initdb(session=session)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 591, in initdb
    upgradedb(session=session)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 824, in upgradedb
    command.upgrade(config, 'heads')
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/command.py", line 298, in upgrade
    script.run_env()
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/script/base.py", line 489, in run_env
    util.load_python_file(self.dir, "env.py")
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/util/pyfiles.py", line 98, in load_python_file
    module = load_module_py(module_id, path)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/util/compat.py", line 184, in load_module_py
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 843, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/env.py", line 107, in <module>
    run_migrations_online()
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/env.py", line 101, in run_migrations_online
    context.run_migrations()
  File "<string>", line 8, in run_migrations
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/runtime/environment.py", line 846, in run_migrations
    self.get_context().run_migrations(**kw)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/runtime/migration.py", line 540, in run_migrations
    raise util.CommandError(
alembic.util.exc.CommandError: Migration "upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance" has left an uncommitted transaction opened; transactional_ddl is False so Alembic is not committing transactions
Run Code Online (Sandbox Code Playgroud)

命令airflow db check-migrations返回此:

[2021-10-18 15:19:12,201] {migration.py:154} INFO - Context impl SQLiteImpl.
[2021-10-18 15:19:12,201] {migration.py:157} INFO - Will assume non-transactional DDL.
Traceback (most recent call last):
  File "/root/miniconda3/envs/localspark/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/db_command.py", line 54, in check_migrations
    db.check_migrations(timeout=args.migration_wait_timeout)
  File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 643, in check_migrations
    raise TimeoutError(
TimeoutError: There are still unapplied migrations after 0 seconds. Migration Head(s) in DB: {'127d2bf2dfa7'} | Migration Head(s) in Source Code: {'7b2661a43ba3'}
Run Code Online (Sandbox Code Playgroud)
已尝试修复

删除 Airflow DB(甚至整个 Airflow 主目录)没有帮助。卸载并重新安装 Airflow

问题

有没有办法修复这些损坏的迁移或以同样可以解决此问题的方式删除气流?