All Articles
Technical 5 min read

Secure FastAPI Implementation

Task: Implement comprehensive security for a FastAPI application

Core Security Features:

  1. JWT Authentication:
# Use python-jose, passlib[bcrypt]
- SECRET_KEY from environment (openssl rand -hex 32)
- HS256 algorithm
- 30 minute token expiry
- Bcrypt password hashing
- HTTPBearer security scheme
  1. Key Functions:
  • verify_password(plain, hashed) - bcrypt verification
  • get_password_hash(password) - bcrypt hashing
  • create_access_token(data, expires_delta) - JWT creation with exp and iat claims
  • get_current_user(credentials) - Validate JWT and return user dict
  1. RBAC System:
  • Roles: ADMIN, MODERATOR, USER
  • Permissions: READ, WRITE, DELETE, MANAGE_USERS
  • Decorator: require_permissions([Permission.WRITE])
  • Check user permissions before route execution
  1. Rate Limiting:
  • Use slowapi library + Redis
  • Pattern: @limiter.limit("5/minute")
  • Return 429 status when exceeded
  • Store in Redis with TTL
  1. Security Headers (add via middleware):
X-Frame-Options: DENY
X-Content-Type-Options: nosniff
Strict-Transport-Security: max-age=31536000
Content-Security-Policy: default-src 'self'
  1. Input Validation with Pydantic:
  • Email: EmailStr type
  • Password: min 8 chars, require uppercase, lowercase, digit, special char
  • Username: constr(min_length=3, max_length=50), alphanumeric only
  • Use @validator decorators for custom validation
  1. CORS Configuration:
CORSMiddleware(
    allow_origins=["https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"]
)
  1. SQL Injection Prevention:
  • Use parameterized queries: database.fetch_one(query, values={"param": value})
  • NEVER use f-strings for SQL queries
  1. Environment Variables:
  • Use pydantic-settings BaseSettings
  • Store: SECRET_KEY, DATABASE_URL, REDIS_URL, ALLOWED_ORIGINS
  • Never commit .env files
  1. Security Logging:
  • Log all authentication events (success/failure)
  • Include: timestamp, IP, user_agent, event_type
  • Use JSON logging for structured logs

Routes to Implement:

  • POST /auth/register - Create user with hashed password
  • POST /auth/login - Return JWT token
  • GET /api/protected - Requires valid JWT
  • DELETE /users/:id - Requires MANAGE_USERS permission

Use this specification to build a production-ready secure API.

FastAPI is an excellent framework for building APIs, but like any web application, it requires careful security implementation. In this comprehensive guide, we’ll cover essential security practices to protect your FastAPI applications from common vulnerabilities.

Security Fundamentals

Before diving into implementation, let’s understand the key security principles:

OWASP Top 10

We’ll address multiple OWASP Top 10 vulnerabilities including: Broken Authentication, Injection, Security Misconfiguration, and Insufficient Logging & Monitoring.

The Security Layers

  1. Authentication - Who are you?
  2. Authorization - What can you do?
  3. Data Protection - Encryption & validation
  4. Rate Limiting - Prevent abuse
  5. Monitoring - Detect threats

1. Authentication with JWT

Let’s implement secure JWT authentication:

# security/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
import os

# Configuration
SECRET_KEY = os.getenv("SECRET_KEY")  # Generate with: openssl rand -hex 32
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify password against hash"""
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """Hash a password"""
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """Create JWT access token"""
    to_encode = data.copy()

    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)

    to_encode.update({"exp": expire, "iat": datetime.utcnow()})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

    return encoded_jwt

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
    """Validate JWT and return current user"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        token = credentials.credentials
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")

        if username is None:
            raise credentials_exception

        # In production, fetch user from database
        return {"username": username, "user_id": payload.get("user_id")}

    except JWTError:
        raise credentials_exception
Secret Key Management

Never hardcode secrets! Use environment variables or a secrets manager like AWS Secrets Manager, HashiCorp Vault, or Azure Key Vault.

Login Endpoint

# routes/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, EmailStr
from security.auth import verify_password, create_access_token, get_password_hash
import database  # Your database module

router = APIRouter(prefix="/auth", tags=["authentication"])

class UserLogin(BaseModel):
    email: EmailStr
    password: str

class UserRegister(BaseModel):
    email: EmailStr
    password: str
    username: str

class Token(BaseModel):
    access_token: str
    token_type: str

@router.post("/login", response_model=Token)
async def login(user: UserLogin):
    """Authenticate user and return JWT token"""

    # Fetch user from database
    db_user = await database.get_user_by_email(user.email)

    if not db_user or not verify_password(user.password, db_user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # Create access token
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": db_user.email, "user_id": db_user.id},
        expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

@router.post("/register", response_model=Token)
async def register(user: UserRegister):
    """Register new user"""

    # Check if user exists
    existing_user = await database.get_user_by_email(user.email)
    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email already registered"
        )

    # Create user
    hashed_password = get_password_hash(user.password)
    new_user = await database.create_user(
        email=user.email,
        username=user.username,
        hashed_password=hashed_password
    )

    # Generate token
    access_token = create_access_token(
        data={"sub": new_user.email, "user_id": new_user.id}
    )

    return {"access_token": access_token, "token_type": "bearer"}

2. Role-Based Access Control (RBAC)

Implement permission-based authorization:

# security/permissions.py
from enum import Enum
from fastapi import HTTPException, status
from typing import List

class Role(str, Enum):
    ADMIN = "admin"
    USER = "user"
    MODERATOR = "moderator"

class Permission(str, Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    MANAGE_USERS = "manage_users"

# Role-Permission mapping
ROLE_PERMISSIONS = {
    Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.MANAGE_USERS],
    Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
    Role.USER: [Permission.READ, Permission.WRITE]
}

def require_permissions(required_permissions: List[Permission]):
    """Decorator factory for permission checking"""

    async def permission_checker(current_user: dict = Depends(get_current_user)):
        # Get user role from database
        user_role = current_user.get("role", Role.USER)

        # Get permissions for role
        user_permissions = ROLE_PERMISSIONS.get(user_role, [])

        # Check if user has all required permissions
        for permission in required_permissions:
            if permission not in user_permissions:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Permission denied: requires {permission}"
                )

        return current_user

    return permission_checker

# Usage in routes
@router.delete("/users/{user_id}")
async def delete_user(
    user_id: int,
    current_user: dict = Depends(require_permissions([Permission.MANAGE_USERS]))
):
    """Delete user - requires MANAGE_USERS permission"""
    await database.delete_user(user_id)
    return {"message": "User deleted successfully"}

3. Rate Limiting

Protect against abuse with rate limiting:

# middleware/rate_limit.py
from fastapi import Request, HTTPException, status
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import redis.asyncio as redis

# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)

# Redis for distributed rate limiting (production)
redis_client = redis.from_url("redis://localhost:6379")

async def rate_limit_check(request: Request, key: str, limit: int, window: int):
    """
    Check rate limit using Redis
    Args:
        key: Unique identifier (IP, user ID, API key)
        limit: Max requests allowed
        window: Time window in seconds
    """
    current = await redis_client.incr(key)

    if current == 1:
        await redis_client.expire(key, window)

    if current > limit:
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail=f"Rate limit exceeded. Try again in {window} seconds."
        )

# Apply to FastAPI app
from fastapi import FastAPI

app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# Usage in routes
from slowapi import Limiter

@app.get("/api/data")
@limiter.limit("5/minute")  # 5 requests per minute
async def get_data(request: Request):
    return {"data": "sensitive information"}
Production Rate Limiting

For production, use Redis or Memcached for distributed rate limiting across multiple servers. In-memory limiting only works for single-instance deployments.

4. CORS Configuration

Properly configure Cross-Origin Resource Sharing:

# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# DEVELOPMENT - Allow all origins (NOT for production!)
# app.add_middleware(
#     CORSMiddleware,
#     allow_origins=["*"],
#     allow_credentials=True,
#     allow_methods=["*"],
#     allow_headers=["*"],
# )

# PRODUCTION - Restrict origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=[
        "https://yourdomain.com",
        "https://app.yourdomain.com",
    ],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
    max_age=3600,  # Cache preflight requests for 1 hour
)

5. Input Validation & SQL Injection Prevention

FastAPI + Pydantic provides excellent validation:

from pydantic import BaseModel, EmailStr, constr, validator
from typing import Optional
import re

class UserCreate(BaseModel):
    email: EmailStr  # Automatically validates email format
    username: constr(min_length=3, max_length=50)  # Length constraints
    password: constr(min_length=8)
    bio: Optional[constr(max_length=500)] = None

    @validator('password')
    def password_strength(cls, v):
        """Ensure strong password"""
        if not re.search(r'[A-Z]', v):
            raise ValueError('Password must contain uppercase letter')
        if not re.search(r'[a-z]', v):
            raise ValueError('Password must contain lowercase letter')
        if not re.search(r'[0-9]', v):
            raise ValueError('Password must contain digit')
        if not re.search(r'[^A-Za-z0-9]', v):
            raise ValueError('Password must contain special character')
        return v

    @validator('username')
    def username_alphanumeric(cls, v):
        """Only allow alphanumeric usernames"""
        if not re.match(r'^[a-zA-Z0-9_]+$', v):
            raise ValueError('Username must be alphanumeric')
        return v

# SQL Injection Prevention with SQLAlchemy/Databases
from databases import Database

database = Database("postgresql://user:pass@localhost/db")

# SAFE - Parameterized query
async def get_user(email: str):
    query = "SELECT * FROM users WHERE email = :email"
    return await database.fetch_one(query=query, values={"email": email})

# UNSAFE - String interpolation (DON'T DO THIS!)
# async def get_user_unsafe(email: str):
#     query = f"SELECT * FROM users WHERE email = '{email}'"  # VULNERABLE!
#     return await database.fetch_one(query=query)

6. Security Headers

Add security headers to all responses:

# middleware/security_headers.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)

        # Prevent clickjacking
        response.headers["X-Frame-Options"] = "DENY"

        # Prevent MIME sniffing
        response.headers["X-Content-Type-Options"] = "nosniff"

        # Enable XSS protection
        response.headers["X-XSS-Protection"] = "1; mode=block"

        # Strict Transport Security (HTTPS only)
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"

        # Content Security Policy
        response.headers["Content-Security-Policy"] = "default-src 'self'"

        # Referrer Policy
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

        # Permissions Policy
        response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"

        return response

# Add to app
app.add_middleware(SecurityHeadersMiddleware)

7. Logging & Monitoring

Implement comprehensive security logging:

# logging_config.py
import logging
from pythonjsonlogger import jsonlogger
import sys

def setup_logging():
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # JSON formatter for structured logging
    logHandler = logging.StreamHandler(sys.stdout)
    formatter = jsonlogger.JsonFormatter(
        '%(asctime)s %(name)s %(levelname)s %(message)s'
    )
    logHandler.setFormatter(formatter)
    logger.addHandler(logHandler)

    return logger

# Security event logging
from fastapi import Request
import logging

logger = logging.getLogger(__name__)

async def log_security_event(
    request: Request,
    event_type: str,
    details: dict
):
    """Log security-relevant events"""
    logger.warning(
        "Security Event",
        extra={
            "event_type": event_type,
            "ip_address": request.client.host,
            "user_agent": request.headers.get("user-agent"),
            "path": request.url.path,
            "method": request.method,
            **details
        }
    )

# Usage in routes
@router.post("/login")
async def login(request: Request, user: UserLogin):
    try:
        # ... authentication logic ...
        await log_security_event(
            request,
            "login_success",
            {"user_email": user.email}
        )
    except HTTPException:
        await log_security_event(
            request,
            "login_failed",
            {"user_email": user.email, "reason": "invalid_credentials"}
        )
        raise

8. API Key Authentication (Alternative)

For service-to-service communication:

# security/api_keys.py
from fastapi import Security, HTTPException, status
from fastapi.security.api_key import APIKeyHeader
import secrets
import hashlib

API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)

# Store hashed API keys (not plain text!)
VALID_API_KEYS = {
    hashlib.sha256(b"your-api-key-here").hexdigest(): {
        "client_name": "Service A",
        "permissions": ["read", "write"]
    }
}

async def get_api_key(api_key: str = Security(api_key_header)):
    """Validate API key"""
    if not api_key:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate API key"
        )

    # Hash the provided key
    key_hash = hashlib.sha256(api_key.encode()).hexdigest()

    if key_hash not in VALID_API_KEYS:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Invalid API key"
        )

    return VALID_API_KEYS[key_hash]

# Generate new API keys
def generate_api_key() -> str:
    """Generate a secure random API key"""
    return secrets.token_urlsafe(32)

# Usage
@app.get("/api/protected")
async def protected_route(api_key: dict = Depends(get_api_key)):
    return {"client": api_key["client_name"], "data": "sensitive"}

9. Environment & Secrets Management

# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    # Security
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30

    # Database
    database_url: str

    # Redis
    redis_url: str = "redis://localhost:6379"

    # CORS
    allowed_origins: list[str] = ["https://yourdomain.com"]

    # Rate Limiting
    rate_limit_per_minute: int = 60

    class Config:
        env_file = ".env"
        case_sensitive = False

@lru_cache()
def get_settings():
    return Settings()

# Usage
from config import get_settings

settings = get_settings()
SECRET_KEY = settings.secret_key
Environment Files

Never commit .env files to version control! Add them to .gitignore and use .env.example as a template.

10. Testing Security

Write tests for security features:

# tests/test_security.py
import pytest
from fastapi.testclient import TestClient
from main import app
import jwt

client = TestClient(app)

def test_login_with_invalid_credentials():
    """Test that invalid credentials are rejected"""
    response = client.post("/auth/login", json={
        "email": "[email protected]",
        "password": "wrong_password"
    })
    assert response.status_code == 401

def test_access_protected_route_without_token():
    """Test that protected routes require authentication"""
    response = client.get("/api/protected")
    assert response.status_code == 403

def test_access_protected_route_with_valid_token():
    """Test that valid tokens grant access"""
    # First login to get token
    login_response = client.post("/auth/login", json={
        "email": "[email protected]",
        "password": "Test123!@#"
    })
    token = login_response.json()["access_token"]

    # Access protected route
    response = client.get(
        "/api/protected",
        headers={"Authorization": f"Bearer {token}"}
    )
    assert response.status_code == 200

def test_rate_limiting():
    """Test that rate limiting works"""
    # Make requests until rate limit is hit
    for i in range(6):  # Assuming 5/minute limit
        response = client.get("/api/data")

    assert response.status_code == 429  # Too many requests

Production Checklist

Security Checklist

Before deploying to production, ensure:

  • ✅ All secrets are stored in environment variables or secret managers
  • ✅ HTTPS is enforced (no HTTP traffic)
  • ✅ CORS is properly configured with specific origins
  • ✅ Rate limiting is enabled on all public endpoints
  • ✅ Input validation is comprehensive
  • ✅ Security headers are applied
  • ✅ Logging is configured and monitored
  • ✅ Database queries use parameterization
  • ✅ Dependencies are up to date (run pip-audit)
  • ✅ Security tests pass

Conclusion

Security is not a feature—it’s a requirement. By implementing these patterns, you’ll protect your FastAPI application against the most common vulnerabilities. Remember:

  1. Defense in depth: Multiple security layers
  2. Least privilege: Grant minimum necessary permissions
  3. Fail securely: Default to denying access
  4. Keep it simple: Complex security is harder to audit
  5. Stay updated: Monitor security advisories

References

  1. FastAPI Security Documentation Official FastAPI security and authentication guide
  2. OWASP Top Ten Most critical web application security risks
  3. JWT Best Practices RFC 8725: JSON Web Token Best Current Practices
  4. Python passlib Documentation Password hashing library for Python
  5. Security Headers Reference Scan and analyze HTTP security headers
  6. Rate Limiting Patterns Google Cloud rate limiting strategies and techniques
HT

Written by Hisham Tariq

Backend Engineer & AI Researcher passionate about building secure, intelligent systems at the intersection of cybersecurity and artificial intelligence. Specializing in Python, FastAPI, and machine learning.