'ray server congested queue - how to best timeout old requests?

Context

I have a client service making requests to a ray server. This client service has a timeout it's willing to wait for each requests. When the ray server gets a bit overloaded with requests, I have observed periods of consecutive timeouts on the client side. This comes alongside a progressive memory increase on the ray server - see screenshot below.

It seems like the way I am using ray queues up requests and treats them first in first out. I am wondering what would be the best way to tell the ray server to prioritize requests differently, so that queued requests that are already too old with respect to the client timeout are dropped and time out immediately, in favor of more recent requests that have a better chance of being completed in time. This would avoid periods where none of the requests are successful.

Ray's deployment decorator has a max_concurrent_queries defaulting to 100, so I am a bit surprised about the memory overload, but in any case it doesn't really answer the prioritization issue.

enter image description here

Reproducibility

In the small self-contained python snippet below, I've illustrated how we could cleanup the queue and skip the work for requests that are too old anyways. This improves a bit the success rate as shown by the prints.

But I am wondering:

  • is it the most conventional way to do this?
  • are there any ray server utils around that?

I think it's common practice to try to prevent this behavior on the client side, by reducing the number of requests when the service is timing out too much to let it catch up. Do you think that's typically the best strategy rather than handling this issue server side?

import logging
import time
from typing import Dict, List

import ray
import ray.serve as serve
import requests
from requests.exceptions import ReadTimeout
from starlette.requests import Request

TIMEOUT = 3
SLEEPING_TIME = 1


@serve.deployment
class TestAnalyzerCleanQueue:
    @serve.batch(max_batch_size=1)
    async def handle_batch(self, batch: List[Dict]) -> List[int]:
        t = time.time()
        min_waited_time = min([abs(t - r["time"]) for r in batch])
        if min_waited_time > TIMEOUT:
            logging.warning(f"all batched elements have been waiting in queue for over {TIMEOUT}, skipping")
            return [-1] * len(batch)
        logging.info(f"Len batch: {len(batch)}")
        time.sleep(SLEEPING_TIME)
        return [r["item"] for r in batch]

    async def __call__(self, request: Request):
        return {"item": await self.handle_batch(await request.json())}


@serve.deployment
class TestAnalyzerAccumulateQueue:
    @serve.batch(max_batch_size=1)
    async def handle_batch(self, batch: List[Dict]) -> List[int]:
        logging.info(f"Len batch: {len(batch)}")
        time.sleep(SLEEPING_TIME)
        return [r["item"] for r in batch]

    async def __call__(self, request: Request):
        return {"item": await self.handle_batch(await request.json())}


@ray.remote
def send_query(item: int, clean_queue: bool = False):
    url = (
        "http://0.0.0.0:8000/TestAnalyzerCleanQueue"
        if clean_queue
        else "http://0.0.0.0:8000/TestAnalyzerAccumulateQueue"
    )
    try:
        r = requests.post(url, json={"item": item, "time": time.time()}, timeout=TIMEOUT).json()["item"]
        return r
    except ReadTimeout:
        return "TIMEOUT"


if __name__ == "__main__":
    serve.start(detached=True, http_options={"host": "0.0.0.0"})
    TestAnalyzerCleanQueue.deploy()
    TestAnalyzerAccumulateQueue.deploy()

    print("WITH CLEANING THE QUEUE")
    response_with_cleaning = []
    for i in range(4):
        batch_response = ray.get([send_query.remote(i, clean_queue=True) for i in range(8)])
        print(batch_response)
        response_with_cleaning.append(batch_response)
        time.sleep(TIMEOUT)

    print("WITHOUT CLEANING THE QUEUE")
    response_without_cleaning = []
    for i in range(4):
        batch_response = ray.get([send_query.remote(i, clean_queue=False) for i in range(8)])
        print(batch_response)
        response_without_cleaning.append(batch_response)
        time.sleep(TIMEOUT)

Logs

WITHOUT CLEANING THE QUEUE : 1 successful request

['TIMEOUT', 1, 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT']
['TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT']
['TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT']
['TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT']

WITH CLEANING THE QUEUE : 6 successful requests

['TIMEOUT', 'TIMEOUT', 'TIMEOUT', 3, 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 7]
[0, 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT']
[0, 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT']
[0, 1, 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT', 'TIMEOUT']


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source