'Terminate previous Celery task with same task id and run again if created

In my django project, I have made a view class by using TemplateView class. Again, I am using django channels and have made a consumer class too. Now, I am trying to use celery worker to pull queryset data whenever a user refreshes the page. But the problem is, if user again refreshes the page before the task gets finished, it create another task which causes overload.

Thus I have used revoke to terminate the previous running task. But I see, the revoke permanently revoked the task id. I don't know how to clear this. Because, I want to run the task again whenever user call it.

views.py

class Analytics(LoginRequiredMixin,TemplateView):
    template_name = 'app/analytics.html'
    login_url = '/user/login/'

    def get_context_data(self, **kwargs):
        app.control.terminate(task_id=self.request.user.username+'_analytics')
        print(app.control.inspect().revoked())
        context = super().get_context_data(**kwargs)
        context['sub_title'] = 'Analytics'
        return context

consumers.py

class AppConsumer(AsyncJsonWebsocketConsumer):
    
    async def connect(self):
        await self.accept()
        analytics_queryset_for_selected_devices.apply_async(
            args=[self.scope['user'].username],
            task_id=self.scope['user'].username+'_analytics'
            )


Solution 1:[1]

Right now I am solving the problem in this following way. In the consumers.py I made a disconnect function which revoke the task when the web socket get closed.

counter = 0 
       
class AppConsumer(AsyncJsonWebsocketConsumer):
    
    async def connect(self):
        await self.accept()
        analytics_queryset_for_selected_devices.apply_async(args=[self.scope['user'].username],
                                                task_id=self.scope['user'].username+str(counter))    

    async def disconnect(self, close_code):
        global counter
        app.control.terminate(task_id=self.scope['user'].username+str(counter), signal='SIGKILL')
        counter += 1
        await self.close()

counter is used for making new unique task id. But in this method, for every request makes a new task id is added in the revoke list which cause load in memory. To minimize the issue I limited the revoke list size to 20.

from celery.utils.collections import LimitedSet
from celery.worker import state

state.revoked = LimitedSet(maxlen=20, expires=3600)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Ahnaf Tahmid Chowdhury