'Best approach to implement server-side caching on a Django GraphQL API?

I have an Angular frontend that uses Apollo Graphql client to interface with a Django Graphql backend (using graphene). It all works well, but the API is very slow, especially when there are concurrent users. I am trying a few things to speed up the API server. One of the things that I am seriously considering is to use server-side caching.

I'm aware that there are some caching solutions that exist for Django out of the box, but these seem to be for caching the views. Is it possible to use these methods to cache Graphql API requests?

What is the most effective caching solution for caching the responses to graphql queries in Django?

I'm posting some of my project's code here so that you get a sense of how my API works.

This is a sample of the Query class containing the various graphql endpints that serve data when queries are received from the client. And I suspect this is where we'll need most of the optimization and caching applied:-

class Query(ObjectType):
    # Public Queries
    user_by_username = graphene.Field(PublicUserType, username=graphene.String())
    public_users = graphene.Field(
        PublicUsers, searchField=graphene.String(), membership_status_not=graphene.List(graphene.String), membership_status_is=graphene.List(graphene.String), roles=graphene.List(graphene.String), limit=graphene.Int(), offset=graphene.Int())

    public_institution = graphene.Field(PublicInstitutionType, code=graphene.String())
    public_institutions = graphene.Field(PublicInstitutions, searchField=graphene.String(), limit = graphene.Int(), offset = graphene.Int())
   
    public_announcement = graphene.Field(AnnouncementType, id=graphene.ID())
    public_announcements = graphene.List(
        AnnouncementType, searchField=graphene.String(), limit=graphene.Int(), offset=graphene.Int())

    def resolve_public_institution(root, info, code, **kwargs):
        institution = Institution.objects.get(code=code, active=True)
        if institution is not None:
            public_institution = generate_public_institution(institution)
            return public_institution
        else:
            return None

    def resolve_public_institutions(root, info, searchField=None, limit=None, offset=None, **kwargs):

        qs = Institution.objects.all().filter(public=True, active=True).order_by('-id')  

        if searchField is not None:
            filter = (
                Q(searchField__icontains=searchField.lower())
            )
            qs = qs.filter(filter)
        total = len(qs)


        if offset is not None:
            qs = qs[offset:]

        if limit is not None:
            qs = qs[:limit]
        
        public_institutions = []
        for institution in qs:

            public_institution = generate_public_institution(institution)
            public_institutions.append(public_institution)
        
        public_institutions.sort(key=lambda x: x.score, reverse=True) # Sorting the results by score before proceeding with pagination        

        results = PublicInstitutions(records=public_institutions, total=total)
        return results            

    @login_required
    def resolve_institution_by_invitecode(root, info, invitecode, **kwargs):
        institution_instance = Institution.objects.get(
            invitecode=invitecode, active=True)
        if institution_instance is not None:
            return institution_instance
        else:
            return None

    @login_required
    @user_passes_test(lambda user: has_access(user, RESOURCES['INSTITUTION'], ACTIONS['GET']))
    def resolve_institution(root, info, id, **kwargs):
        current_user = info.context.user
        institution_instance = Institution.objects.get(pk=id, active=True)
        allow_access = is_record_accessible(current_user, RESOURCES['INSTITUTION'], institution_instance)
        if allow_access != True:
            institution_instance = None

        return institution_instance

    @login_required
    @user_passes_test(lambda user: has_access(user, RESOURCES['INSTITUTION'], ACTIONS['LIST']))
    def resolve_institutions(root, info, searchField=None, limit=None, offset=None, **kwargs):
        current_user = info.context.user
        qs = rows_accessible(current_user, RESOURCES['INSTITUTION'])

        if searchField is not None:
            filter = (
                Q(searchField__icontains=searchField.lower())
            )
            qs = qs.filter(filter)
        total = len(qs)


        if offset is not None:
            qs = qs[offset:]

        if limit is not None:
            qs = qs[:limit]
        
        results = Institutions(records=qs, total=total)
        return results

    @login_required
    def resolve_user(root, info, id, **kwargs):
        user_instance = User.objects.get(pk=id, active=True)
        if user_instance is not None:
            user_instance = redact_user(root, info, user_instance)
            return user_instance
        else:
            return None


    def resolve_user_by_username(root, info, username, **kwargs):
        user = None
        try:
            user = User.objects.get(username=username, active=True)
        except:
            raise GraphQLError('User does not exist!')
        courses = Report.objects.filter(active=True, participant_id=user.id)
        if user is not None:
            user = redact_user(root, info, user)
            title = user.title if user.title else user.role.name
            new_user = PublicUserType(id=user.id, username=user.username, name=user.name, title=title, bio=user.bio, avatar=user.avatar,institution=user.institution, courses=courses)
            return new_user      
        else:
            return None

    def process_users(root, info, searchField=None, all_institutions=False, membership_status_not=[], membership_status_is=[], roles=[], unpaginated = False, limit=None, offset=None, **kwargs):
        current_user = info.context.user
        admin_user = is_admin_user(current_user)

        qs = rows_accessible(current_user, RESOURCES['MEMBER'], {'all_institutions': all_institutions})

        if searchField is not None:
            filter = (
                Q(searchField__icontains=searchField.lower()) | Q(username__icontains=searchField.lower()) | Q(email__icontains=searchField.lower())
            )
            qs = qs.filter(filter)

        if membership_status_not:
            qs = qs.exclude(membership_status__in=membership_status_not)

        if membership_status_is:
            qs = qs.filter(membership_status__in=membership_status_is)
        if roles:
            qs = qs.filter(role__in=roles)

        redacted_qs = []

        if admin_user:
            redacted_qs = qs
        else:
            # Replacing the user avatar if the requesting user is not of the same institution and is not a super admin
            for user in qs:
                user = redact_user(root, info, user)
                redacted_qs.append(user)
        
        pending = []
        uninitialized = []
        others = []
        for user in redacted_qs:
            if user.membership_status == User.StatusChoices.PENDINIG:
                pending.append(user)
            elif user.membership_status == User.StatusChoices.UNINITIALIZED:
                uninitialized.append(user)
            else:
                others.append(user)
        
        sorted_qs = pending + uninitialized + others
        
        total = len(sorted_qs)

        if unpaginated == True:
            results = Users(records=sorted_qs, total=total)
            return results
            

        if offset is not None:
            sorted_qs = sorted_qs[offset:]

        if limit is not None:
            sorted_qs = sorted_qs[:limit]
        
        results = Users(records=sorted_qs, total=total)
        return results

    @login_required
    def resolve_users(root, info, searchField=None, membership_status_not=[], membership_status_is=[], roles=[], limit=None, offset=None, **kwargs):
        all_institutions=False
        unpaginated = False
        qs = Query.process_users(root, info, searchField, all_institutions, membership_status_not, membership_status_is, roles, unpaginated, limit, offset, **kwargs)
        return qs

    def resolve_public_users(root, info, searchField=None, membership_status_not=[], membership_status_is=[], roles=[], limit=None, offset=None, **kwargs):   
        all_institutions=True
        unpaginated = True        
        results = Query.process_users(root, info, searchField, all_institutions, membership_status_not, membership_status_is, roles, unpaginated, limit, offset, **kwargs)

        records = results.records
        total = results.total

        public_users = []
        
        # This is to limit the fields in the User model that we are exposing in this GraphQL query
        for user in records:
            courses = Report.objects.filter(active=True, participant_id=user.id)
            score = 0
            for course in courses:
                score += course.completed * course.percentage
            new_user = PublicUserType(id=user.id, username=user.username, name=user.name, title=user.title, bio=user.bio, avatar=user.avatar,institution=user.institution, score=score)
            public_users.append(new_user)

        public_users.sort(key=lambda x: x.score, reverse=True) # Sorting the results by score before proceeding with pagination

        if offset is not None:
            public_users = public_users[offset:]

        if limit is not None:
            public_users = public_users[:limit]
        results = PublicUsers(records=public_users, total=total)
        return results

I'm not sure what other code you'll need to see to get a sense, because the rest of the setup is typical of a Django application.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source