'Terminate previous Celery task with same task id and run again if created
In my django project, I have made a view class by using TemplateView class. Again, I am using django channels and have made a consumer class too. Now, I am trying to use celery worker to pull queryset data whenever a user refreshes the page. But the problem is, if user again refreshes the page before the task gets finished, it create another task which causes overload.
Thus I have used revoke to terminate the previous running task. But I see, the revoke permanently revoked the task id. I don't know how to clear this. Because, I want to run the task again whenever user call it.
views.py
class Analytics(LoginRequiredMixin,TemplateView):
template_name = 'app/analytics.html'
login_url = '/user/login/'
def get_context_data(self, **kwargs):
app.control.terminate(task_id=self.request.user.username+'_analytics')
print(app.control.inspect().revoked())
context = super().get_context_data(**kwargs)
context['sub_title'] = 'Analytics'
return context
consumers.py
class AppConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
analytics_queryset_for_selected_devices.apply_async(
args=[self.scope['user'].username],
task_id=self.scope['user'].username+'_analytics'
)
Solution 1:[1]
Right now I am solving the problem in this following way. In the consumers.py I made a disconnect function which revoke the task when the web socket get closed.
counter = 0
class AppConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
analytics_queryset_for_selected_devices.apply_async(args=[self.scope['user'].username],
task_id=self.scope['user'].username+str(counter))
async def disconnect(self, close_code):
global counter
app.control.terminate(task_id=self.scope['user'].username+str(counter), signal='SIGKILL')
counter += 1
await self.close()
counter is used for making new unique task id. But in this method, for every request makes a new task id is added in the revoke list which cause load in memory. To minimize the issue I limited the revoke list size to 20.
from celery.utils.collections import LimitedSet
from celery.worker import state
state.revoked = LimitedSet(maxlen=20, expires=3600)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Ahnaf Tahmid Chowdhury |
