'Airflow upon starting - MySQL server has gone away

We added a second airflow database to support our staging airflow instance, and now both staging and production seem to have intermittent connection issues. We recently upgraded to 2.2.4, but looking through past RDS logs, I see aborted connections (Got an error reading communication packets) prior to the upgrade as well (previous version 1.10.11). Both instances have a webserver UI that pull in past DAG runs fine, and the scheduler is running DAGs appropriately and storing dag_run data. When the service is started however, we receive an error "MySQL server has gone away" (see below for stack trace). airflow db check and airflow db shell both return successful connections. Some relevant airflow config values are:

sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
sql_alchemy_schema =
sql_alchemy_reconnect_timeout = 300

I've also tried upping the pool size to 20, and also confirmed that our db size should be able to handle at least 90 concurrent connections.

[2022-04-28 16:32:03,212] {manager.py:512} WARNING - Refused to delete permission view, assoc with role exists DAG Runs.can_create Admin
Process ForkProcess-19:
Traceback (most recent call last):
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
   cursor, statement, parameters, context
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
   cursor.execute(statement, parameters)
 File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 206, in execute
   res = self._query(query)
 File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 319, in _query
   db.query(q)
 File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 254, in query
   _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
 File "/usr/local/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
   self.run()
 File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
   self._target(*self._args, **self._kwargs)
 File "/usr/local/lib/python3.6/site-packages/airflow/dag_processing/manager.py", line 287, in _run_processor_manager
   processor_manager.start()
 File "/usr/local/lib/python3.6/site-packages/airflow/dag_processing/manager.py", line 520, in start
   return self._run_parsing_loop()
 File "/usr/local/lib/python3.6/site-packages/airflow/dag_processing/manager.py", line 585, in _run_parsing_loop
   self._find_zombies()
 File "/usr/local/lib/python3.6/site-packages/airflow/utils/session.py", line 70, in wrapper
   return func(*args, session=session, **kwargs)
 File "/usr/local/lib/python3.6/site-packages/airflow/dag_processing/manager.py", line 1079, in _find_zombies
   LJ.latest_heartbeat < limit_dttm,
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3373, in all
   return list(self)
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
   return self._execute_and_instances(context)
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
   result = conn.execute(querycontext.statement, self._params)
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
   return meth(self, multiparams, params)
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
   return connection._execute_clauseelement(self, multiparams, params)
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
   distilled_params,
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
   e, statement, parameters, cursor, context
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
   sqlalchemy_exception, with_traceback=exc_info[2], from_=e
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
   raise exception
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
   cursor, statement, parameters, context
 File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
   cursor.execute(statement, parameters)
 File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 206, in execute
   res = self._query(query)
 File "/usr/local/lib/python3.6/site-packages/MySQLdb/cursors.py", line 319, in _query
   db.query(q)
 File "/usr/local/lib/python3.6/site-packages/MySQLdb/connections.py", line 254, in query
   _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2006, 'MySQL server has gone away')
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag.fileloc AS dag_fileloc, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash
FROM task_instance INNER JOIN job ON task_instance.job_id = job.id AND job.job_type IN (%s) INNER JOIN dag ON task_instance.dag_id = dag.dag_id INNER JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE task_instance.state = %s AND (job.state != %s OR job.latest_heartbeat < %s)]
[parameters: ('LocalTaskJob', <TaskInstanceState.RUNNING: 'running'>, <TaskInstanceState.RUNNING: 'running'>, datetime.datetime(2022, 4, 28, 16, 27, 2, 870459))]
(Background on this error at: http://sqlalche.me/e/13/e3q8)```


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source