'Apache Airflow broken migration prevents from running standalone mode
I've installed Apache Airflow locally, using Pip, and run in standalone mode like instructed in the Docs.
The problem
After the restart, trying to run again airflow standalone, gives me this error:
standalone | Starting Airflow Standalone
standalone | Checking database is initialized
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
Traceback (most recent call last):
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlite3.OperationalError: duplicate column name: max_tries
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/root/miniconda3/envs/localspark/bin/airflow", line 8, in <module>
sys.exit(main())
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/standalone_command.py", line 48, in entrypoint
StandaloneCommand().run()
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/standalone_command.py", line 64, in run
self.initialize_database()
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/standalone_command.py", line 170, in initialize_database
db.initdb()
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 591, in initdb
upgradedb(session=session)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 824, in upgradedb
command.upgrade(config, 'heads')
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/command.py", line 298, in upgrade
script.run_env()
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/script/base.py", line 489, in run_env
util.load_python_file(self.dir, "env.py")
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/util/pyfiles.py", line 98, in load_python_file
module = load_module_py(module_id, path)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/util/compat.py", line 184, in load_module_py
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/env.py", line 107, in <module>
run_migrations_online()
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/env.py", line 101, in run_migrations_online
context.run_migrations()
File "<string>", line 8, in run_migrations
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/runtime/environment.py", line 846, in run_migrations
self.get_context().run_migrations(**kw)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/runtime/migration.py", line 518, in run_migrations
step.migration_fn(**kw)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py", line 60, in upgrade
op.add_column('task_instance', sa.Column('max_tries', sa.Integer, server_default="-1"))
File "<string>", line 8, in add_column
File "<string>", line 3, in add_column
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/operations/ops.py", line 1927, in add_column
return operations.invoke(op)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/operations/base.py", line 374, in invoke
return fn(self, operation)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/operations/toimpl.py", line 132, in add_column
operations.impl.add_column(table_name, column, schema=schema, **kw)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/ddl/impl.py", line 237, in add_column
self._exec(base.AddColumn(table_name, column, schema=schema))
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/ddl/impl.py", line 140, in _exec
return conn.execute(construct, *multiparams, **params)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/sql/ddl.py", line 72, in _execute_on_connection
return connection._execute_ddl(self, multiparams, params)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1068, in _execute_ddl
ret = self._execute_context(
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) duplicate column name: max_tries
[SQL: ALTER TABLE task_instance ADD COLUMN max_tries INTEGER DEFAULT '-1']
(Background on this error at: http://sqlalche.me/e/13/e3q8)
When trying to reset database using airflow db reset, it returns this:
airflow db reset
DB: sqlite:////root/airflow/airflow.db
This will drop existing tables if they exist. Proceed? (y/n)y
[2021-10-18 15:17:43,572] {db.py:831} INFO - Dropping tables that exist
[2021-10-18 15:17:43,704] {migration.py:154} INFO - Context impl SQLiteImpl.
[2021-10-18 15:17:43,705] {migration.py:157} INFO - Will assume non-transactional DDL.
[2021-10-18 15:17:43,800] {db.py:823} INFO - Creating tables
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1, current schema
INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/ddl/sqlite.py:40 UserWarning: Skipping unsupported ALTER for creation of implicit constraintPlease refer to the batch mode feature which allows for SQLite migrations using a copy-and-move strategy.
INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
Traceback (most recent call last):
File "/root/miniconda3/envs/localspark/bin/airflow", line 8, in <module>
sys.exit(main())
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/db_command.py", line 39, in resetdb
db.resetdb()
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 839, in resetdb
initdb(session=session)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 591, in initdb
upgradedb(session=session)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 824, in upgradedb
command.upgrade(config, 'heads')
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/command.py", line 298, in upgrade
script.run_env()
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/script/base.py", line 489, in run_env
util.load_python_file(self.dir, "env.py")
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/util/pyfiles.py", line 98, in load_python_file
module = load_module_py(module_id, path)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/util/compat.py", line 184, in load_module_py
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 843, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/env.py", line 107, in <module>
run_migrations_online()
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/migrations/env.py", line 101, in run_migrations_online
context.run_migrations()
File "<string>", line 8, in run_migrations
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/runtime/environment.py", line 846, in run_migrations
self.get_context().run_migrations(**kw)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/alembic/runtime/migration.py", line 540, in run_migrations
raise util.CommandError(
alembic.util.exc.CommandError: Migration "upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance" has left an uncommitted transaction opened; transactional_ddl is False so Alembic is not committing transactions
Command airflow db check-migrations returns this:
[2021-10-18 15:19:12,201] {migration.py:154} INFO - Context impl SQLiteImpl.
[2021-10-18 15:19:12,201] {migration.py:157} INFO - Will assume non-transactional DDL.
Traceback (most recent call last):
File "/root/miniconda3/envs/localspark/bin/airflow", line 8, in <module>
sys.exit(main())
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/cli/commands/db_command.py", line 54, in check_migrations
db.check_migrations(timeout=args.migration_wait_timeout)
File "/root/miniconda3/envs/localspark/lib/python3.8/site-packages/airflow/utils/db.py", line 643, in check_migrations
raise TimeoutError(
TimeoutError: There are still unapplied migrations after 0 seconds. Migration Head(s) in DB: {'127d2bf2dfa7'} | Migration Head(s) in Source Code: {'7b2661a43ba3'}
Fixes tried
Deleting the Airflow DB (even the whole Airflow home directory) doesn't help. Uninstalling and reinstalling Airflow either
The question
Is there a way to either fix these broken migrations or remove the Airflow in a way that would also get rid of this problem?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
