'FastAPI/Unvicorn request hanging during invocation of call_next (path operation function) in middleware

We have a machine learning model inside a docker container running on EC2.

We use Cortex.dev to autoscale GPUs.

Non-deterministically, requests will hang during the call_next function in the FastAPI middleware. Unfortunately, it's not reproducible.

The Middleware pre-request print line gets logged, but the first print statement in the path operation function never gets logged.

Things we tried:

  • Running Uvicorn with 1 worker
  • Running without async for the run function
  • Running with with bytes as the parameter type for image instead of UploadFile

None of these changes solve the hanging issue, but this is the most performant configuration.

  1. Does this mean the issue is with FastAPI and not Uvicorn?

  2. If yes, what could cause FastAPI to hang? If no, where is the problem and what would fix it?

Dockerfile

FROM nvidia/cuda:11.4.0-runtime-ubuntu18.04

WORKDIR /usr/src/app

RUN apt-get -y update && \
    apt-get install -y --fix-missing \
    build-essential \
    cmake \
    python3 \
    python3-pip \
    ffmpeg \
    libsm6 \
    libxext6 \
    && apt-get clean && rm -rf /tmp/* /var/tmp/*

ADD ./requirements.txt ./

# install our dependencies
RUN python3 -m pip install --upgrade pip && python3 -m pip install -r requirements.txt && apt-get clean && rm -rf /tmp/* /var/tmp/*

ADD ./ ./

ENV LC_ALL=C.UTF-8 
ENV LANG=C.UTF-8

EXPOSE 8080

CMD uvicorn api:app --host 0.0.0.0 --port 8080 --workers 2

api.py

from my_predictor import PythonPredictor
from typing import Optional
from datetime import datetime
import time
from starlette.responses import Response

from fastapi import FastAPI, File, UploadFile, Form, Response, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = ["*"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    cortex_id = request.headers.get('x-request-id')
    start_time = time.time()
    print("Cortex ID: " + cortex_id + ". > Middleware pre-request. Time stamp: " + str(start_time), flush=True)

    response = await call_next(request)

    process_time = time.time() - start_time
    print("Cortex ID: " + cortex_id + ". > Middleware post-response. Duration: " + str(process_time), flush=True)

    return response



@app.post("/")
async def run(request: Request, image: UploadFile = File(...), renderFactor:Optional[int] = Form(12), requestId:Optional[str] = Form('-1'),include_header:Optional[str] = Form('bin')):

    try:
        cortexId = request.headers.get('x-request-id')
        print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " >>> Request received. Time stamp: " + str(datetime.now()))

        start = time.time()
    
        image = await image.read()

        payload = {}
        payload['image'] = image
        payload['renderFactor'] = renderFactor
        payload['requestId'] = requestId
        payload['include_header'] = include_header
        
        response = pred.predict(payload)

        end = time.time()

        totalTime = round(end - start, 2)

        print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " > Request processed. Duration: " + str(totalTime) + " seconds. Time stamp: " + str(datetime.now()))

        if totalTime > 5:
            print("Long request detected. Duration: " + str(totalTime))

        return response
        
    except Exception as error:
        end = time.time()
        print(str(error))
        print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " > Error. Duration: " + str(round(end - start, 2)) + " seconds . Time stamp: " + str(datetime.now()))

        raise HTTPException(status_code = 500, detail = str(error))

config = {}
pred = PythonPredictor(config)


Solution 1:[1]

Solution 1

From your code I can see you have used Response from both starlette.responses package and FastAPI's Response package which might have caused the hanging issue

from starlette.responses import Response

# remove the Response from fastapi
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request

Solution 2

If your issue still persists

FastAPI documentation states the Request that imported from fastapi package is ideally from Starlette(Starlette Requests documentation link)

You could also use from starlette.requests import Request. FastAPI provides it as a convenience for you, the developer. But it comes directly from Starlette.

Replace from fastapi import Request with from starlette.requests import Request

There was a similar issue in official FastAPI github issues in this link, the application was run with uvicorn <file>:app. The below codeblock implemented directly with starlette.requests did not produce issue of hanging, suggesting the issue was due to FastAPI.

from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse

app = Starlette()


@app.middleware("http")
async def func(request: Request, call_next):
    #print(await request.json())
    return await call_next(request)


@app.route('/', methods=["POST"])
def homepage(request):
    return JSONResponse({"Hello": "World"})

Ensure to use starlette.requests and starlette.responses in your code as below

from starlette.responses import Response
from starlette.requests import Request

# Request and Response removed from fastapi as directly referred from starlette
from fastapi import FastAPI, File, UploadFile, Form, HTTPException 
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = ["*"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    cortex_id = request.headers.get('x-request-id')
    start_time = time.time()
    print("Cortex ID: " + cortex_id + ". > Middleware pre-request. Time stamp: " + str(start_time), flush=True)

    response = await call_next(request)

    process_time = time.time() - start_time
    print("Cortex ID: " + cortex_id + ". > Middleware post-response. Duration: " + str(process_time), flush=True)

    return response

Solution 2:[2]

The Basic Info, Root-Cause & the Rant

Hey, I have spent a fair amount of time stuck in this hanging issue (for critical apps in my org which have multiple custom MDW). This is basically because @app.middleware("http") based middlewares, are in back-end created by inheriting from Starlette's BaseHTTPMiddleware. So this problem also exists for MDWs written by inheriting from BaseHTTPMiddleware explicitly. The reasons for this are quite complex and this is what I have understood so far:

  1. from here (GitHub Starlette Issue) and here (Github FastAPI Issue): I learnt that this method uses StreamingResponse which has some issues
  2. from here (GitHub Starlette Issue): I learnt that one of the reasons for hanging is: awaiting request.json() is allowed only once in request life-cycle in API, and BaseHTTPMiddleware also creates a Request object on its own (which causes hanging issue, since that is another request)

The last link also mentions that what also causes the hanging issue, is that, because of StreamingResponse; the reading of response somehow gets exhausted in the first read, and when it comes to the second read, it keeps on waiting for it indefinitely which causes the hanging. (first and second read here means: in ASGI APPs, messages are sent to-and-from client & APP with various types like http.response.start, http.response.body etc)

The Solution

So, don't use anything to do with BaseHTTPMiddleware. To combat this I wrote all my custom middlewares using ASGI Specification given here

You can make your custom middleware like this:

from starlette.types import ASGIApp, Receive, Send, Message

class LogProcessingTime:

    def __init__(self, app: ASGIApp) -> None:
        self.app = app
    
    async def __call__(self, scope: Scope, receive: Receive, send: Send):
        
        start_time = time.time()

        async def send_wrapper(message: Message):
            # This will capture response coming from APP Layer
            # response body will be in message where the type is
            # "http.response.body"
            if message["type"] == "http.response.body":
                process_time = time.time() - start_time
                # you can log this process_time now any way you prefer

            await send(message)
        
        await self.app(scope, receive, send_wrapper)


# you can add this to your app this way:
app.add_middleware(LogProcessingTime)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 raghavsikaria