'Remove item from Queue after a set interval

By now I've looked at many, if not all, of the asyncio / aiohttp rate limiters out there. They all failed to solve the rate limit that is imposed on the REST API that I'm trying to work with. The REST API gives me a 504 (Service Unavailable) whenever I feed it > 100 requests per minute.

I tried limiting the number of requests sent in one minute, but still I got the 504. As it turns out, it's because the server includes the time it take to handle te request. Then I tried limiting the number of requests based on the nr of received responses. That didn't work either because now, I was sending it well over a 100 requests by the time the first 100 had returned.

The LeakyBucket mechanism doesn't work, because it let's through request 101, 102, 103.. in a steady pace after the first 100 have bursted trough. Resulting in >100 requests per minute. The TokenBucket mechanism slows down everything, even when for example when only 90 requests are made.

What I'm basically trying to do is; to create a queue that gets added upon when a request is being made, and clears the item in the queue one minute after the request was completed.

I like how the asyncio.Queue already implements the wait whenever you try to put a new item in the queue. The code below works, but it keeps the code running for the entire minute, even when just one request is made.

import asyncio


class Ratelimiter:
    def __init__(self, max_calls: int, period: int):
        self.period = period
        self.queue = asyncio.Queue(maxsize=max_calls)

    async def __aenter__(self) -> "Ratelimiter":
        await self.queue.put(None)
        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb) -> None:
        await asyncio.sleep(self.interval)
        await self.queue.get()
        self.queue.task_done()
        return None


Solution 1:[1]

I've found my solution; by having a separate Thread doing to countdown timer, the mainthread can continue.

import asyncio
import collections
import threading
import time


class Ratelimiter:
    def __init__(self, max_calls: int, period: int):
        self.max_calls = max_calls
        self.interval = period
        self.queue = collections.deque()

    async def __aenter__(self) -> "Ratelimiter":
        while len(self.queue) >= self.max_calls:
            await asyncio.sleep(
                self.queue[0] + self.interval - time.monotonic()
            )
        self.queue.append(time.monotonic())
        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb) -> None:
        threading.Thread(target=self._sleep_and_remove).start()
        return None

    def _sleep_and_remove(self):
        time.sleep(self.interval)
        self.queue.popleft()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Niels Perfors