'Django Keycloak integration flow

Try to follow the follow steps to integrate keycloak with Django system without using the builtin contrib.auth app.enter image description here

My question is:

  1. If I am not using the built-in User objects, how do I determine if a user is authenticated or not, by using the SSO session Keycloak generated?
  2. Step 5, associate it with a session, what does it mean?

Thanks!



Solution 1:[1]

This might be a little late but could be helpful to someone in the future.

The contrib.auth is useful when you want to implement Django Authentication only.

The given approach assumes you are using the users from Keycloak and don't have a Django User Object to work with. If you are looking for later case, look at this package - https://django-keycloak.readthedocs.io/en/latest/

My usecase for integration of Keycloak with React Frontend and a REST DRF Backend. For integration with keycloak, you can make a custom middleware responsible for unpacking access_tokens generated from keycloak send in the Authorization Header on each request and validate permissions based on your client config file you need to export from keycloak from your respective client.

This is the middleware I used for my usecase

import re
import logging
from django.conf import settings
from django.http.response import JsonResponse
from django.utils.deprecation import MiddlewareMixin
from keycloak import KeycloakOpenID
from keycloak.exceptions import (
    KeycloakInvalidTokenError,
)
from rest_framework.exceptions import (
    PermissionDenied,
    AuthenticationFailed,
    NotAuthenticated,
)

logger = logging.getLogger(__name__)


class KeycloakMiddleware(MiddlewareMixin):
    """
    Custom KeyCloak Middleware for Authentication and Authorization
    """

    def __init__(self, get_response):
        """
        :param get_response:
        """
        super().__init__(get_response=get_response)

        self.config = settings.KEYCLOAK_CONFIG

        # Read configurations
        try:
            self.server_url = self.config["KEYCLOAK_SERVER_URL"]
            self.client_id = self.config["KEYCLOAK_CLIENT_ID"]
            self.realm = self.config["KEYCLOAK_REALM"]
        except KeyError as keycloak_snippet_no_exist:
            raise Exception(
                "KEYCLOAK_SERVER_URL, KEYCLOAK_CLIENT_ID or KEYCLOAK_REALM not found."
            ) from keycloak_snippet_no_exist

        self.client_secret_key = self.config.get("KEYCLOAK_CLIENT_SECRET_KEY", None)
        self.client_public_key = self.config.get("KEYCLOAK_CLIENT_PUBLIC_KEY", None)
        self.default_access = self.config.get("KEYCLOAK_DEFAULT_ACCESS", "DENY")
        self.method_validate_token = self.config.get("KEYCLOAK_METHOD_VALIDATE_TOKEN", "INTROSPECT")
        self.keycloak_authorization_config = self.config.get("KEYCLOAK_AUTHORIZATION_CONFIG", None)
        # Create Keycloak instance
        self.keycloak = KeycloakOpenID(
            server_url=self.server_url,
            client_id=self.client_id,
            realm_name=self.realm,
            client_secret_key=self.client_secret_key,
        )

        # Read policies
        if self.keycloak_authorization_config:
            self.keycloak.load_authorization_config(self.keycloak_authorization_config)

        # Django
        self.get_response = get_response

    @property
    def keycloak(self):
        """
        Getter KeyCloak Instance
        """
        return self._keycloak

    @keycloak.setter
    def keycloak(self, value):
        self._keycloak = value

    @property
    def config(self):
        """
        Getter Config Instance
        """
        return self._config

    @config.setter
    def config(self, value):
        self._config = value

    @property
    def server_url(self):
        """
        Getter Server URL
        """
        return self._server_url

    @server_url.setter
    def server_url(self, value):
        self._server_url = value

    @property
    def client_id(self):
        """
        Getter Client_ID of KeyCloak Client
        """
        return self._client_id

    @client_id.setter
    def client_id(self, value):
        self._client_id = value

    @property
    def client_secret_key(self):
        """
        Getter Client Secret Key
        """
        return self._client_secret_key

    @client_secret_key.setter
    def client_secret_key(self, value):
        self._client_secret_key = value

    @property
    def client_public_key(self):
        """
        Getter Client Public Key
        """
        return self._client_public_key

    @client_public_key.setter
    def client_public_key(self, value):
        self._client_public_key = value

    @property
    def realm(self):
        """
        Getter KeyClaok Realm
        """
        return self._realm

    @realm.setter
    def realm(self, value):
        self._realm = value

    @property
    def keycloak_authorization_config(self):
        """
        Getter KeyCloak Authorization Config
        """
        return self._keycloak_authorization_config

    @keycloak_authorization_config.setter
    def keycloak_authorization_config(self, value):
        self._keycloak_authorization_config = value

    @property
    def method_validate_token(self):
        """
        Getter Validate Token Private Method
        """
        return self._method_validate_token

    @method_validate_token.setter
    def method_validate_token(self, value):
        self._method_validate_token = value

    def __call__(self, request):
        """
        :param request:
        :return:
        """
        return self.get_response(request)

    def process_view(self, request, view_func, view_args, view_kwargs):
        # pylint: disable=unused-argument
        """
        Validate only the token introspect.
        :param request: django request
        :param view_func:
        :param view_args: view args
        :param view_kwargs: view kwargs
        :return:
        """

        if hasattr(settings, "KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS"):
            path = request.path_info.lstrip("/")

            if any(re.match(m, path) for m in settings.KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS):
                logger.debug("** exclude path found, skipping")
                return None

        try:
            view_scopes = view_func.cls.keycloak_scopes
        except AttributeError as _keycloak_attribute_error:
            logger.debug(
                "Allowing free acesss, since no authorization configuration (keycloak_scopes) \
                found for this request route :%s",
                request,
            )
            return None

        if "HTTP_AUTHORIZATION" not in request.META:
            return JsonResponse(
                {"detail": NotAuthenticated.default_detail},
                status=NotAuthenticated.status_code,
            )

        auth_header = request.META.get("HTTP_AUTHORIZATION").split()
        token = auth_header[1] if len(auth_header) == 2 else auth_header[0]

        # Get default if method is not defined.
        required_scope = (
            view_scopes.get(request.method, None)
            if view_scopes.get(request.method, None)
            else view_scopes.get("DEFAULT", None)
        )

        # DEFAULT scope not found and DEFAULT_ACCESS is DENY
        if not required_scope and self.default_access == "DENY":
            return JsonResponse(
                {"detail": PermissionDenied.default_detail},
                status=PermissionDenied.status_code,
            )

        try:
            # >>>>>>> Added Options kwargs to verify decode permissions to django middleware
            options = {
                "verify_signature": True,
                "verify_aud": False,
                "verify_exp": True,
            }
            user_permissions = self.keycloak.get_permissions(
                token,
                method_token_info=self.method_validate_token.lower(),
                key=self.client_public_key,
                options=options,
            )
        except KeycloakInvalidTokenError as _keycloak_invalid_token_error:
            return JsonResponse(
                {"detail": AuthenticationFailed.default_detail},
                status=AuthenticationFailed.status_code,
            )

        for perm in user_permissions:
            if required_scope in perm.scopes:
                return None

        # User Permission Denied
        return JsonResponse(
            {"detail": PermissionDenied.default_detail},
            status=PermissionDenied.status_code,
        )

With this approach, you can define policy authorization straight inside your view class similar to this.

class GetMockData(APIView):
    """
    Another GET API to Fetch fake Data
    """

    keycloak_scopes = {"GET": "medicine:view"}

    def get(self, request: HttpRequest) -> HttpResponse:
        """
        V1 API to get some medicine data from the database
        """
        data = get_all_medicines()
        if not data:
            res = CustomResponse(
                success=False,
                payload=None,
                error=E_RANGE_MESSAGE,
                status=status.HTTP_404_NOT_FOUND,
            )
            return res.send_response()
        logger.warning("Testing a fake warning >>>>>>>> WARNING <<<<<<<<?")
        data = MedicineSerializer(data, many=True).data
        res = CustomResponse(success=True, payload=data)
        return res.send_response()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 era5tone