Explore How to Effectively Use JWT With FastAPI

Written by aanchlia | Published 2023/08/29
Tech Story Tags: python | fastapi | jwt-token | hashing | security | api | oauth2 | swagger

TLDRIn this article we talk about how to implement JWT with FastAPI. There are also code snippets for easy understanding.via the TL;DR App

FastAPI is a modern Python web framework for building APIs. It is fast, easy to use, and enables the creation of robust and scalable APIs.

Some of its features are:

  • Uses Starlette and Pydantic under the hood for blazing-fast performance.

  • Minimizes boilerplate with a simple declaration of routes and inputs.

  • Great editor support and easy-to-remember syntax.

  • It automatically generates OpenAPI schemas and docs.

If you want to learn more, check out the FastAPI docs. In this article, we will talk about how FastAPI endpoints can be secured using JWT.


JWT(JSON Web Token) provides a very secure way to authenticate endpoints. Some of its features are:

  • JSON-encoded access tokens that are cryptographically signed.

  • Contains claims like issuer, expiry, subject, etc.

  • Verifiable as they are signed using a secret key.

  • Useful for securely transmitting information between parties.

If you want to learn more about JWT and see how they work, check out jwt.io


After covering these basics, let’s get started with the coding part.

Install fastapi, uvicorn, python-jose, and passlib.

$ pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]

Uvicorn is an ASGI web server for Python. It is the recommended server for FastAPI.

Python-jose to generate and verify the JWT tokens. You can also use PyJWT.

Passlib handles password hashes.

Import necessary packages:

from datetime import datetime, timedelta
from typing import Union
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated

Define test db and secret.

test_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "[email protected]",
        "hashed_password": "$2b$12$PQO42dMponRoPGWyt5co5OiZCQJcpKDFxet8n5MnkUHNNHAg/mioS",
    }
}

# openssl rand -hex 32
SECRET_KEY = "380e91edcddfe3c0733585523c7bf4b80c9e6d720787142ed9da5e6c37a29185"
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30  

This SECRET_KEY will be used to sign JWT. Don’t use this key; generate a new one. To generate a new key, run this in the terminal.

$ openssl rand -hex 32

Algorithm will be HS256(HMAC with SHA-256).

Create models:

class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None


class UserInDB(User):
    hashed_password: str

Before we move ahead, let’s first talk about the password workflow in Layman’s terms.

  • A user will enter a username and password and click login.

  • The client will make an API call with that username and password, and the backend will verify whether the username exists and the password matches with the one in DB.

  • The backend will generate a token (jwt for this article) with an expiry date and return it to the client.

  • After login, the user will use that token in the Authorization header to make subsequent API calls.

Now, create passlib’s CryptContext instance and oauth scheme.

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

When we create an instance of the OAuth2PasswordBearer class we pass in the tokenUrl parameter. This parameter contains the URL that the client (the frontend running in the user's browser) will use to send the username and password in order to get a token.

FastAPI Security

Here, tokenUrl is a relative URL.

Now, create the login endpoint. The client will call this endpoint to authenticate.

@app.post("/login", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    """Login method to authenticate"""
    
    user = authenticate_user(test_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}


def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})

    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

Here, we are using OAuth2PasswordRequestForm, which is a class dependency provided by FastAPI. It has a form body with a username, password, and some other fields that we don’t need for this article.

Let’s go through the rest of the code.

  • The client calls the login method, which takes the username and password and tries to find the user in DB, then it matches the password with the one in db using passlib.

  • After verifying the request, it creates a jwt token with username and expiry, which it returns to the user.

The create_access_token method creates an encoded jwt that uses SECRET_KEY, ALGORITHM, and to_encode data, which has a username and expiry to generate the token.

Let’s try to fetch the current user after logging in.

@app.get("/user/current/", response_model=User)
async def current_user_profile(current_user: Annotated[User, Depends(get_current_user)]):
    """Fetch current user"""

    return current_user

This endpoint would return current user data.

It all looks good so far, but we need a way to test it.

Run this in the terminal:

$ uvicorn main:app --reload

And head to http://127.0.0.1:8000/docs

You should see something like this:

Click on Authorize, and a pop-up will open.

Add testuser and password as password and click Authorize. It would authorize the user. After this, try the /user/current/ endpoint to fetch the current user data. Click on Try it Out to run it. You can open the Chrome dev tools, and check that it only sent the bearer token this time.

If you have made it so far, Congratulations!

Here’s the full code below:

from datetime import datetime, timedelta
from typing import Union
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from typing_extensions import Annotated

# openssl rand -hex 32
SECRET_KEY = "380e91edcddfe3c0733585523c7bf4b80c9e6d720787142ed9da5e6c37a29185"
ALGORITHM = "HS256"
TOKEN_EXPIRE_MINUTES = 30

test_db = {
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "email": "[email protected]",
        "hashed_password": "$2b$12$PQO42dMponRoPGWyt5co5OiZCQJcpKDFxet8n5MnkUHNNHAg/mioS",
    }
}

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Union[str, None] = None

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None

class UserInDB(User):
    hashed_password: str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")

app = FastAPI()

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})

    # use jwt to create a token   
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            return status.HTTP_401_UNAUTHORIZED
        token_data = TokenData(username=username)
    except JWTError:
        return JWTError
    user = get_user(test_db, username=token_data.username)
    return user

@app.post("/login", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    """Login method to authenticate"""

    user = authenticate_user(test_db, form_data.username, form_data.password)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token_expires = timedelta(minutes=TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/user/current/", response_model=User)
async def current_user_profile(current_user: Annotated[User, Depends(get_current_user)]):
    """Fetch current user"""
    
    return current_user

Feel free to reach out in comments for any questions or suggestions.

Thank you!


[1] FastAPI documentation — FastAPI Security

https://fastapi.tiangolo.com/tutorial/security/


Written by aanchlia | Ankit is a highly accomplished software development professional with over a decade of experience in the industry.
Published by HackerNoon on 2023/08/29