'Flask web app - Handle async methods exceptions
I am trying to handle exceptions that can occur within asynchronous methods of a Flask application deployed on Google App Engine and started with a gunicorn server.
The worfklow of the app is simple :
- Check client inputs (form) & create a task and store its data in a SQLAlchemy dB.
- Launch asynchronous computations within a thread (use of concurrent.futures & flask_executor libs)
- Redirect client to a waiting page - a JS function checks every 2s the task status in the dB
- When the async method has finished its jobs, it updates the task status in dB.
Our issue was that exceptions raised within the async methods were not propagated, and the app is waiting endlessly the DB task to be updated.
As a fix, I check the thread result within the /check_task_status endpoint called by the JS function, and I raise an exception if an error occurend in the thread.
This works well locally but unfortunately once deployed on GAE the app crashes systematically - error is a gunicorn worker timeout (5 min in gunicorn.conf).
[2022-05-23 13:50:00 +0000] [7] [CRITICAL] WORKER TIMEOUT (pid:10)
[2022-05-23 13:50:00 +0000] [10] [INFO] worker received SIGABRT signal
[2022-05-23 13:50:00 +0000] [10] [INFO] Worker exiting (pid: 10)
Here are my questions :
- How can my fix make the gunicorn server crash ? How can I work around this issue ?
- Do you have implementation advice about how to handle thread exceptions within a Flask app ?
For your information I have chosen to use flask_executor as it is a Flask extension that makes it easy to work with concurrent.futures. I have read other workarounds that does not respond to my needs :
- Use Flask 2.0 asynchronous support - does not work as I want to redirect to a waiting html page just after sending the async call. (yt tutorial)
- Use Celeri / Redis instead of concurrent.futures - seems over engineering for my use case. (SO post)
I add a code snippet to make my explanations easier to understand :
import json
import time
from flask import Flask, request, render_template, redirect
from flask_executor import Executor
from flask_sqlalchemy import SQLAlchemy
from app import GeoToolTaskInfo
from models import ExitStatus
#create app
app = Flask(__name__)
# create thread executor
executor = Executor(app)
app.config['EXECUTOR_TYPE'] = 'thread'
app.config['EXECUTOR_MAX_WORKERS'] = 1
app.config['EXECUTOR_PROPAGATE_EXCEPTIONS'] = True
# create db connexion
db = SQLAlchemy(app)
@app.route('/foo', methods=['GET', 'POST'])
def foo():
# get form values
file_values = json.loads(request.form.get('fileValues', '{}'))
# add task in dB
task_info = GeoToolTaskInfo(**file_values)
db.session.add(task_info)
db.session.commit()
executor.submit_stored(
future_key=task_info.id,
fn=process_async_method,
**file_values,
)
return redirect(f'waiting_page/{task_info.id}')
def process_async_method(**kwargs):
# process long background task... error may occur and is not propagated
time.sleep(6000)
task_info = GeoToolTaskInfo.query.get(kwargs['task_id'])
task_info.status = ExitStatus.SUCCESS
db.session.commit()
@app.route("/check_task_status", methods=['GET', 'POST'])
def check_status():
# method called every 2s by the /waiting_page
# if an error occurend withind the thread, redirect to failure page
req = request.get_json()
task_id = req['task']
task = GeoToolTaskInfo.query.get(task_id)
try:
thread_name = f"{task.use_case}_{task.method}_{task_id}"
executor.futures.result(thread_name)
except Exception:
task.status = ExitStatus.FAILURE.value
db.session.commit()
return render_template("failure.html",
foo=task.foo,
bar=task.bar,
baz=task.baz,
task_id=req['task']
)
# ... Check db task status and redirect to computations results html page
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
