FastAPI is an excellent framework for building APIs, but like any web application, it requires careful security implementation. In this comprehensive guide, we’ll cover essential security practices to protect your FastAPI applications from common vulnerabilities.
Security Fundamentals
Before diving into implementation, let’s understand the key security principles:
We’ll address multiple OWASP Top 10 vulnerabilities including: Broken Authentication, Injection, Security Misconfiguration, and Insufficient Logging & Monitoring.
The Security Layers
- Authentication - Who are you?
- Authorization - What can you do?
- Data Protection - Encryption & validation
- Rate Limiting - Prevent abuse
- Monitoring - Detect threats
1. Authentication with JWT
Let’s implement secure JWT authentication:
# security/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from typing import Optional
import os
# Configuration
SECRET_KEY = os.getenv("SECRET_KEY") # Generate with: openssl rand -hex 32
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""Validate JWT and return current user"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
token = credentials.credentials
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
# In production, fetch user from database
return {"username": username, "user_id": payload.get("user_id")}
except JWTError:
raise credentials_exception
Never hardcode secrets! Use environment variables or a secrets manager like AWS Secrets Manager, HashiCorp Vault, or Azure Key Vault.
Login Endpoint
# routes/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, EmailStr
from security.auth import verify_password, create_access_token, get_password_hash
import database # Your database module
router = APIRouter(prefix="/auth", tags=["authentication"])
class UserLogin(BaseModel):
email: EmailStr
password: str
class UserRegister(BaseModel):
email: EmailStr
password: str
username: str
class Token(BaseModel):
access_token: str
token_type: str
@router.post("/login", response_model=Token)
async def login(user: UserLogin):
"""Authenticate user and return JWT token"""
# Fetch user from database
db_user = await database.get_user_by_email(user.email)
if not db_user or not verify_password(user.password, db_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": db_user.email, "user_id": db_user.id},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register", response_model=Token)
async def register(user: UserRegister):
"""Register new user"""
# Check if user exists
existing_user = await database.get_user_by_email(user.email)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
# Create user
hashed_password = get_password_hash(user.password)
new_user = await database.create_user(
email=user.email,
username=user.username,
hashed_password=hashed_password
)
# Generate token
access_token = create_access_token(
data={"sub": new_user.email, "user_id": new_user.id}
)
return {"access_token": access_token, "token_type": "bearer"}
2. Role-Based Access Control (RBAC)
Implement permission-based authorization:
# security/permissions.py
from enum import Enum
from fastapi import HTTPException, status
from typing import List
class Role(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
class Permission(str, Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
MANAGE_USERS = "manage_users"
# Role-Permission mapping
ROLE_PERMISSIONS = {
Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.MANAGE_USERS],
Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
Role.USER: [Permission.READ, Permission.WRITE]
}
def require_permissions(required_permissions: List[Permission]):
"""Decorator factory for permission checking"""
async def permission_checker(current_user: dict = Depends(get_current_user)):
# Get user role from database
user_role = current_user.get("role", Role.USER)
# Get permissions for role
user_permissions = ROLE_PERMISSIONS.get(user_role, [])
# Check if user has all required permissions
for permission in required_permissions:
if permission not in user_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: requires {permission}"
)
return current_user
return permission_checker
# Usage in routes
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
current_user: dict = Depends(require_permissions([Permission.MANAGE_USERS]))
):
"""Delete user - requires MANAGE_USERS permission"""
await database.delete_user(user_id)
return {"message": "User deleted successfully"}
3. Rate Limiting
Protect against abuse with rate limiting:
# middleware/rate_limit.py
from fastapi import Request, HTTPException, status
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
import redis.asyncio as redis
# Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
# Redis for distributed rate limiting (production)
redis_client = redis.from_url("redis://localhost:6379")
async def rate_limit_check(request: Request, key: str, limit: int, window: int):
"""
Check rate limit using Redis
Args:
key: Unique identifier (IP, user ID, API key)
limit: Max requests allowed
window: Time window in seconds
"""
current = await redis_client.incr(key)
if current == 1:
await redis_client.expire(key, window)
if current > limit:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded. Try again in {window} seconds."
)
# Apply to FastAPI app
from fastapi import FastAPI
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Usage in routes
from slowapi import Limiter
@app.get("/api/data")
@limiter.limit("5/minute") # 5 requests per minute
async def get_data(request: Request):
return {"data": "sensitive information"}
For production, use Redis or Memcached for distributed rate limiting across multiple servers. In-memory limiting only works for single-instance deployments.
4. CORS Configuration
Properly configure Cross-Origin Resource Sharing:
# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
# DEVELOPMENT - Allow all origins (NOT for production!)
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"],
# allow_credentials=True,
# allow_methods=["*"],
# allow_headers=["*"],
# )
# PRODUCTION - Restrict origins
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://yourdomain.com",
"https://app.yourdomain.com",
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
max_age=3600, # Cache preflight requests for 1 hour
)
5. Input Validation & SQL Injection Prevention
FastAPI + Pydantic provides excellent validation:
from pydantic import BaseModel, EmailStr, constr, validator
from typing import Optional
import re
class UserCreate(BaseModel):
email: EmailStr # Automatically validates email format
username: constr(min_length=3, max_length=50) # Length constraints
password: constr(min_length=8)
bio: Optional[constr(max_length=500)] = None
@validator('password')
def password_strength(cls, v):
"""Ensure strong password"""
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'[0-9]', v):
raise ValueError('Password must contain digit')
if not re.search(r'[^A-Za-z0-9]', v):
raise ValueError('Password must contain special character')
return v
@validator('username')
def username_alphanumeric(cls, v):
"""Only allow alphanumeric usernames"""
if not re.match(r'^[a-zA-Z0-9_]+$', v):
raise ValueError('Username must be alphanumeric')
return v
# SQL Injection Prevention with SQLAlchemy/Databases
from databases import Database
database = Database("postgresql://user:pass@localhost/db")
# SAFE - Parameterized query
async def get_user(email: str):
query = "SELECT * FROM users WHERE email = :email"
return await database.fetch_one(query=query, values={"email": email})
# UNSAFE - String interpolation (DON'T DO THIS!)
# async def get_user_unsafe(email: str):
# query = f"SELECT * FROM users WHERE email = '{email}'" # VULNERABLE!
# return await database.fetch_one(query=query)
6. Security Headers
Add security headers to all responses:
# middleware/security_headers.py
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Prevent MIME sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Enable XSS protection
response.headers["X-XSS-Protection"] = "1; mode=block"
# Strict Transport Security (HTTPS only)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
# Content Security Policy
response.headers["Content-Security-Policy"] = "default-src 'self'"
# Referrer Policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy
response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
return response
# Add to app
app.add_middleware(SecurityHeadersMiddleware)
7. Logging & Monitoring
Implement comprehensive security logging:
# logging_config.py
import logging
from pythonjsonlogger import jsonlogger
import sys
def setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# JSON formatter for structured logging
logHandler = logging.StreamHandler(sys.stdout)
formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
return logger
# Security event logging
from fastapi import Request
import logging
logger = logging.getLogger(__name__)
async def log_security_event(
request: Request,
event_type: str,
details: dict
):
"""Log security-relevant events"""
logger.warning(
"Security Event",
extra={
"event_type": event_type,
"ip_address": request.client.host,
"user_agent": request.headers.get("user-agent"),
"path": request.url.path,
"method": request.method,
**details
}
)
# Usage in routes
@router.post("/login")
async def login(request: Request, user: UserLogin):
try:
# ... authentication logic ...
await log_security_event(
request,
"login_success",
{"user_email": user.email}
)
except HTTPException:
await log_security_event(
request,
"login_failed",
{"user_email": user.email, "reason": "invalid_credentials"}
)
raise
8. API Key Authentication (Alternative)
For service-to-service communication:
# security/api_keys.py
from fastapi import Security, HTTPException, status
from fastapi.security.api_key import APIKeyHeader
import secrets
import hashlib
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
# Store hashed API keys (not plain text!)
VALID_API_KEYS = {
hashlib.sha256(b"your-api-key-here").hexdigest(): {
"client_name": "Service A",
"permissions": ["read", "write"]
}
}
async def get_api_key(api_key: str = Security(api_key_header)):
"""Validate API key"""
if not api_key:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate API key"
)
# Hash the provided key
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
if key_hash not in VALID_API_KEYS:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API key"
)
return VALID_API_KEYS[key_hash]
# Generate new API keys
def generate_api_key() -> str:
"""Generate a secure random API key"""
return secrets.token_urlsafe(32)
# Usage
@app.get("/api/protected")
async def protected_route(api_key: dict = Depends(get_api_key)):
return {"client": api_key["client_name"], "data": "sensitive"}
9. Environment & Secrets Management
# config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Security
secret_key: str
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# Database
database_url: str
# Redis
redis_url: str = "redis://localhost:6379"
# CORS
allowed_origins: list[str] = ["https://yourdomain.com"]
# Rate Limiting
rate_limit_per_minute: int = 60
class Config:
env_file = ".env"
case_sensitive = False
@lru_cache()
def get_settings():
return Settings()
# Usage
from config import get_settings
settings = get_settings()
SECRET_KEY = settings.secret_key
Never commit .env files to version control! Add them to .gitignore and use .env.example as a template.
10. Testing Security
Write tests for security features:
# tests/test_security.py
import pytest
from fastapi.testclient import TestClient
from main import app
import jwt
client = TestClient(app)
def test_login_with_invalid_credentials():
"""Test that invalid credentials are rejected"""
response = client.post("/auth/login", json={
"email": "[email protected]",
"password": "wrong_password"
})
assert response.status_code == 401
def test_access_protected_route_without_token():
"""Test that protected routes require authentication"""
response = client.get("/api/protected")
assert response.status_code == 403
def test_access_protected_route_with_valid_token():
"""Test that valid tokens grant access"""
# First login to get token
login_response = client.post("/auth/login", json={
"email": "[email protected]",
"password": "Test123!@#"
})
token = login_response.json()["access_token"]
# Access protected route
response = client.get(
"/api/protected",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
def test_rate_limiting():
"""Test that rate limiting works"""
# Make requests until rate limit is hit
for i in range(6): # Assuming 5/minute limit
response = client.get("/api/data")
assert response.status_code == 429 # Too many requests
Production Checklist
Before deploying to production, ensure:
- ✅ All secrets are stored in environment variables or secret managers
- ✅ HTTPS is enforced (no HTTP traffic)
- ✅ CORS is properly configured with specific origins
- ✅ Rate limiting is enabled on all public endpoints
- ✅ Input validation is comprehensive
- ✅ Security headers are applied
- ✅ Logging is configured and monitored
- ✅ Database queries use parameterization
- ✅ Dependencies are up to date (run
pip-audit) - ✅ Security tests pass
Conclusion
Security is not a feature—it’s a requirement. By implementing these patterns, you’ll protect your FastAPI application against the most common vulnerabilities. Remember:
- Defense in depth: Multiple security layers
- Least privilege: Grant minimum necessary permissions
- Fail securely: Default to denying access
- Keep it simple: Complex security is harder to audit
- Stay updated: Monitor security advisories
References
- FastAPI Security Documentation Official FastAPI security and authentication guide
- OWASP Top Ten Most critical web application security risks
- JWT Best Practices RFC 8725: JSON Web Token Best Current Practices
- Python passlib Documentation Password hashing library for Python
- Security Headers Reference Scan and analyze HTTP security headers
- Rate Limiting Patterns Google Cloud rate limiting strategies and techniques