'Lost connection to MySQL server during query with Sanic and Asyncmy (MySQL)

I'm facing an issue I'm having a hard time to identify.

I made a Database context system to wrap requests inside with that creates a connection to Mysql. Here's the full code :

custom/database/database.py

# -*- coding:utf-8 -*-

from sqlalchemy import exc, event
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession as SQLAlchemyAsyncSession
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import Pool, QueuePool  # NullPool
from sqlalchemy.exc import OperationalError
from contextvars import ContextVar
from sanic import Sanic


class EngineNotInitialisedError(Exception):
    pass


class DBSessionContext:
    def __init__(self, read_session: Session, write_session: Session, commit_on_exit: bool = True) -> None:
        self.read_session = read_session
        self.write_session = write_session
        self.commit_on_exit = commit_on_exit

        self.token = None
        self._read = None
        self._write = None

    def _disable_flush(self, *args, **kwargs):
        raise NotImplementedError('Unable to flush a read-only session.')

    async def close(self, exc_type=None, exc_value=None, traceback=None):
        if self._write:
            if exc_value and getattr(exc_value, 'status_code', 500) > 300:
                await self._write.rollback()
            else:
                await self._write.commit()

            try:
                await self._write.close()
            except OperationalError as e:
                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query
                    raise e

        if self._read:
            try:
                await self._read.close()
            except OperationalError as e:
                if e.orig.args[0] != 2013:  # Lost connection to MySQL server during query
                    raise e

    def set_token(self, token):
        self.token = token

    @property
    def read(self) -> Session:
        if not self._read:
            self._read = self.read_session()
            self._read.flush = self._disable_flush

        return self._read

    @property
    def write(self) -> Session:
        if not self._write:
            self._write = self.write_session()

        return self._write


class AsyncSession(SQLAlchemyAsyncSession):
    async def execute(self, statement, **parameters):
        return await super().execute(statement, parameters)

    async def first(self, statement, **parameters):
        executed = await self.execute(statement, **parameters)
        return executed.first()

    async def all(self, statement, **parameters):
        executed = await self.execute(statement, **parameters)
        return executed.all()


class DBSession:
    def __init__(self):
        self.app = None
        self.read_engine = None
        self.read_session = None
        self.write_engine = None
        self.write_session = None
        self._session = None
        self.context = ContextVar("context", default=None)
        self.commit_on_exit = True

    def init_app(self, app: Sanic) -> None:
        self.app = app
        self.commit_on_exit = self.app.config.get('DATABASE_COMMIT_ON_EXIT', cast=bool, default=True)

        engine_args = {
            'echo': self.app.config.get('DATABASE_ECHO', cast=bool, default=False),
            'echo_pool': self.app.config.get('DATABASE_ECHO_POOL', cast=bool, default=False),
            'poolclass': QueuePool,  # will be used to create a connection pool instance using the connection parameters given in the URL
            # if pool_class is not NullPool:

            # if True will enable the connection pool “pre-ping” feature that tests connections for liveness upon each checkout
            'pool_pre_ping': self.app.config.get('DATABASE_POOL_PRE_PING', cast=bool, default=True),
            # the number of connections to allow in connection pool “overflow”
            'max_overflow': self.app.config.get('DATABASE_MAX_OVERFLOW', cast=int, default=10),
            # the number of connections to keep open inside the connection pool
            'pool_size': self.app.config.get('DATABASE_POOL_SIZE', cast=int, default=100),
            # this setting causes the pool to recycle connections after the given number of seconds has passed
            'pool_recycle': self.app.config.get('DATABASE_POOL_RECYCLE', cast=int, default=3600),
            # number of seconds to wait before giving up on getting a connection from the pool
            'pool_timeout': self.app.config.get('DATABASE_POOL_TIMEOUT', cast=int, default=5),
        }

        self.read_engine = create_async_engine(
            self.app.config.get('DATABASE_READ_URL'),
            connect_args={
                'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
            },
            **engine_args
        )

        # @see https://writeonly.wordpress.com/2009/07/16/simple-read-only-sqlalchemy-sessions/
        self.read_session = sessionmaker(
            bind=self.read_engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=False,
            autocommit=False
        )

        self.write_engine = create_async_engine(
            self.app.config.get('DATABASE_WRITE_URL'),
            connect_args={
                'connect_timeout': self.app.config.get('DATABASE_CONNECT_TIMEOUT', cast=int, default=3)
            },
            **engine_args
        )

        self.write_session = sessionmaker(
            bind=self.write_engine,
            expire_on_commit=False,
            class_=AsyncSession,
            autoflush=True
        )

    async def __aenter__(self):
        session_ctx = DBSessionContext(self.read_session, self.write_session, self.commit_on_exit)
        session_ctx.set_token(self.context.set(session_ctx))

        return session_ctx

    async def __aexit__(self, exc_type, exc_value, traceback):
        session_ctx = self.context.get()
        await session_ctx.close(exc_type, exc_value, traceback)

        self.context.reset(session_ctx.token)

    @property
    def read(self) -> Session:
        return self.context.get().read

    @property
    def write(self) -> Session:
        return self.context.get().write


@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
    '''Listener for Pool checkout events that pings every connection before using.
    Implements pessimistic disconnect handling strategy. See also:
    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''

    cursor = dbapi_con.cursor()
    try:
        cursor.execute("SELECT 1")
    except exc.OperationalError as ex:
        if ex.args[0] in (2006,   # MySQL server has gone away
                          2013,   # Lost connection to MySQL server during query
                          2055):  # Lost connection to MySQL server at '%s', system error: %d
            raise exc.DisconnectionError()  # caught by pool, which will retry with a new connection
        else:
            raise

    cursor.close()


db = DBSession()

Using it is quite simple, I do the following. In the router, I made a wrapper that calls the handler with the db initiated:

custom/route.py

class Route:
    async def __call__(self, request: Request, **kwargs):
        async with db:
            response = await self.handler(*args)

            # process the response, such as chaning a str to a text response, etc

        return response

Unfortunately, I noticed that I have a lot of

(2013, 'Lost connection to MySQL server during query')

And I don't know how or why this happens. This happens to relatively small queries (that contains "LIMIT 1" with indexed columns that should be fast)

Here's the full stack trace:

[2022-05-19 09:35:25 +0000] [92185] [ERROR] Exception occurred while handling uri: 'https://api.pdfshift.io/redacted'
Traceback (most recent call last):
  File "asyncmy/connection.pyx", line 610, in asyncmy.connection.Connection._read_bytes
    data = await self._reader.readexactly(num_bytes)
  File "/usr/lib/python3.9/asyncio/streams.py", line 721, in readexactly
    raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 4 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 92, in execute
    return self.await_(self._execute_async(operation, parameters))
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
    return current.driver.switch(awaitable)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
    value = await result
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 104, in _execute_async
    result = await self._cursor.execute(operation, parameters)
  File "asyncmy/cursors.pyx", line 180, in execute
    result = await self._query(query)
  File "asyncmy/cursors.pyx", line 365, in _query
    await conn.query(q)
  File "asyncmy/connection.pyx", line 455, in query
    await self._read_query_result(unbuffered=unbuffered)
  File "asyncmy/connection.pyx", line 636, in _read_query_result
    await result.read()
  File "asyncmy/connection.pyx", line 1023, in read
    first_packet = await self.connection.read_packet()
  File "asyncmy/connection.pyx", line 578, in read_packet
    packet_header = await self._read_bytes(4)
  File "asyncmy/connection.pyx", line 618, in _read_bytes
    raise errors.OperationalError(CR_SERVER_LOST, msg) from e
asyncmy.errors.OperationalError: (2013, 'Lost connection to MySQL server during query')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "handle_request", line 83, in handle_request
    )
  File "/var/www/project/www/custom/route.py", line 162, in __call__
    response = await response
  File "/var/www/project/www/apps/webhooks/views.py", line 104, in stripe
    await account.reset_usage()
  File "/var/www/project/www/apps/accounts/models.py", line 133, in reset_usage
    while await db.read.first(query, uuid=self.uuid):
  File "/var/www/project/www/custom/database/database.py", line 73, in first
    executed = await self.execute(statement, **parameters)
  File "/var/www/project/www/custom/database/database.py", line 70, in execute
    return await super().execute(statement, parameters)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/session.py", line 211, in execute
    return await greenlet_spawn(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 134, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1692, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
    ret = self._execute_context(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 92, in execute
    return self.await_(self._execute_async(operation, parameters))
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 76, in await_only
    return current.driver.switch(awaitable)
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 129, in greenlet_spawn
    value = await result
  File "/var/www/project/env/lib/python3.9/site-packages/sqlalchemy/dialects/mysql/asyncmy.py", line 104, in _execute_async
    result = await self._cursor.execute(operation, parameters)
  File "asyncmy/cursors.pyx", line 180, in execute
    result = await self._query(query)
  File "asyncmy/cursors.pyx", line 365, in _query
    await conn.query(q)
  File "asyncmy/connection.pyx", line 455, in query
    await self._read_query_result(unbuffered=unbuffered)
  File "asyncmy/connection.pyx", line 636, in _read_query_result
    await result.read()
  File "asyncmy/connection.pyx", line 1023, in read
    first_packet = await self.connection.read_packet()
  File "asyncmy/connection.pyx", line 578, in read_packet
    packet_header = await self._read_bytes(4)
  File "asyncmy/connection.pyx", line 618, in _read_bytes
    raise errors.OperationalError(CR_SERVER_LOST, msg) from e
sqlalchemy.exc.OperationalError: (asyncmy.errors.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: SELECT id FROM conversions WHERE [redacted] LIMIT 1]
[parameters: ('redacted',)]
(Background on this error at: https://sqlalche.me/e/14/e3q8)

When connecting to the database, here's the parameter I provide:

DATABASE_POOL_PRE_PING = True
DATABASE_MAX_OVERFLOW = 10
DATABASE_POOL_SIZE = 100
DATABASE_POOL_RECYCLE = 3600
DATABASE_POOL_TIMEOUT = 5
DATABASE_CONNECT_TIMEOUT = 3

(If you need details from the MySQL server side, let me know which command to run and I'll add the output here).

My assumption is that somehow, the connection is not properly closed when exiting the async with db part, so when another requests comes in, the same connection is used, but ultimately, MySQL kills it, causing the above error Lost connection to MySQL server during query

Further details :

  1. The error is the same, but the queries changes, showing that the error is not from a specific part of the code, but related to the connection
  2. I was able to catch this issue when sending a webhook event from Stripe. The error returned by Stripe is "Expired". Which seems to indicate that before being stopped, the connection hangs (probably waiting on the SQL query to finish)
  3. This doesn't happen everytime : I was able to run some webhooks successfully, and other not, for the same event (Stripe), so again, it doesn't seems to be an error with the code related to handling the request (but maybe on how the DB is managed)

Thank you for your help !



Solution 1:[1]

There are a number of issues with your code:

  • num[] can hold 100 values max, but you are allowing the user to enter up to 500 values into it, thus you have the potential for a buffer overflow.

  • you say the valid number of students is 1 to 500, but you are preventing the user from entering 1.

  • lack of adequate error handling when reading the user's input.

  • you are outputting "Number of grades above the Average : " inside the loop for every matching grade, which you say you don't want. You should instead output it once before entering the loop, then use the loop to count the matching grades without outputting each one, then output the final count after the loop is finished.

Try this:

#include <iostream>
#include <limits>
using namespace std;

const int maxStudents = 500;

int main()
{
    int G, N, nAverages = 0;
    float num[maxStudents], sum = 0.0, average;
    
    cout << "Enter the number of Students : ";
    do
    {
        if (cin >> N)
        {
            if (N >= 1 && N <= maxStudents)
                break;

            cout << "Error ! Number of students should be in range of (1 to " << maxStudents << ")." << endl;
        }
        else
        {
            cout << "Error ! Invalid input." << endl;
            cin.clear();
            cin.ignore(numeric_limits<streamsize>::max(), '\n');
        }

        cout << "Enter the number again: ";
    }
    while (true);

    for(G = 0; G < N; ++G)
    {
        cout << G + 1 << ". Enter Mark : ";
        while (!(cin >> num[G]))
        {
            cout << "Error ! Invalid input." << endl;
            cin.clear();
            cin.ignore(numeric_limits<streamsize>::max(), '\n');
            cout << "Enter the mark again: ";
        }
        sum += num[G];
    }

    // find average
    average = sum / N;
    cout << endl << "Grades Average = " << average << endl << endl;
  
    // find Grades above or equal the Average  
    cout << "Grades above or equal the Average : " << endl;
    for (G = 0; G < N; ++G){
        if (num[G] >= average){
            cout << G + 1 << ": " << num[G] << endl;
            ++nAverages;
        }
    }
  
    // Number of grades above the Average  
    cout << endl << "Number of grades above the Average : " << nAverages << endl;
    
    return 0;
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Remy Lebeau