'How to refresh token and burn token in logout In FastAPI?

I use this code for authentication.

#1. auth_handler.py

import time

from decouple import config
import jwt

JWT_SECRET = config("JWT_SECRET")
JWT_ALGORITHM = config("JWT_ALGORITHM")


def response_token(token: str) -> dict:
    """
        docstring
    """
    return {
        "access_token": token
    }


def sign_jwt(user_id: str or int, validity: int) -> dict:
    """
        docstring
    """
    payload = {
        "user_id": user_id,
        "expiry": time.time() + validity
    }

    token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

    return response_token(
        {
            "access_token": token
        }
    )


def decode_jwt(token: str) -> dict:
    """
        docstring
    """
    try:
        decoded_token = jwt.decode(
            token,
            JWT_SECRET,
            algorithms=[JWT_ALGORITHM]
        )
        return decoded_token if decoded_token["expiry"] >= time.time() else {}
    except:
        return {}

and

#2. auth_bearer.py

from fastapi import Request, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from . import auth_handler


class JWTBearer(HTTPBearer):
    """
        docstring
    """

    def __init__(self, auto_error: bool = True):
        """
            docstring
        """
        super(JWTBearer, self).__init__(auto_error=auto_error)

    async def __call__(self, request: Request):
        """
            docstring
        """
        credentials: HTTPAuthorizationCredentials = await super(JWTBearer, self).__call__(request)
        if credentials:
            if not credentials.scheme == "Bearer":
                raise HTTPException(
                    status_code=403,
                    detail="Invalid Authentication Scheme."
                )
            if not self.verify_jwt(credentials.credentials):
                raise HTTPException(
                    status_code=403,
                    detail="Invalid Token OR Expired Token."
                )
            return credentials.credentials
        else:
            raise HTTPException(
                status_code=403,
                detail="Invalid Authorization Code."
            )

    def verify_jwt(self, jwt_token: str) -> bool:
        """
            docstring
        """
        isTokenValid: bool = False

        try:
            payload = auth_handler.decode_jwt(jwt_token)
        except:
            payload = None
        if payload:
            isTokenValid = True
        return isTokenValid

To secure the routes, I'll leverage dependency injection via FastAPI's Depends.

from fastapi import FastAPI, Body, Depends
from schemas import UserSchema, UserLoginSchema
from auth_bearer import JWTBearer
from auth_handler import signJWT

app = FastAPI()

def check_user(data: UserLoginSchema):
    for user in users:
        if user.email == data.email and user.password == data.password:
            return True
    return False

@app.post("/user/signup", tags=["user"])
async def create_user(user: UserSchema = Body(...)):
    users.append(user) # replace with db call, making sure to hash the password first
    return signJWT(user.email)


@app.post("/user/login", tags=["user"])
async def user_login(user: UserLoginSchema = Body(...)):
    if check_user(user):
        return signJWT(user.email)
    return {
        "error": "Wrong login details!"
    }

@app.post("/posts", dependencies=[Depends(JWTBearer())], tags=["posts"])
async def add_post(post: PostSchema) -> dict:
    post.id = len(posts) + 1
    posts.append(post.dict())
    return {
        "data": "post added."
    }

this code works for sign-in and correct authorization.

But I want to add refresh token functionality and burn token functionality for the time that users log out.

How to implement these functionalities?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source