编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

增强 FastAPI 安全性:防范常见漏洞

wxchong 2024-11-10 12:16:31 开源技术 55 ℃ 0 评论

FastAPI 是一种现代、快速(高性能)的 Web 框架,用于使用 Python 构建 API。它基于 Starlette(Web 部分)和 Pydantic(数据部分)构建,设计为易于使用,同时提供强大的功能。但是,与任何 Web 框架一样,确保 FastAPI 应用程序的安全性对于保护您的数据和用户至关重要。在这篇博文中,我们将讨论一些常见漏洞以及如何在 FastAPI 中防范这些漏洞。

1. SQL 注入

SQL 注入是一种可以破坏数据库的代码注入技术。它是最常见的 Web 黑客技术之一。

解决方案:使用参数化查询或 ORM。

演示:将 SQLAlchemy 与 FastAPI 结合使用

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from . import models, schemas, database


app = FastAPI()


@app.post("/items/")
def create_item(item: schemas.ItemCreate, db: Session = Depends(database.get_db)):
    db_item = models.Item(name=item.name, description=item.description, price=item.price)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

在此示例中,SQLAlchemy 的 ORM 自动处理参数化,防止 SQL 注入。

2. 跨站点脚本 (XSS)

XSS 是一种漏洞,允许攻击者将恶意脚本注入原本受信任的网站的内容中。

解决方案:使用 Jinja2 等模板引擎自动转义输入。

演示:在 Jinja2 模板中转义用户输入

<!DOCTYPE html>
<html>
<head>
    <title>FastAPI App</title>
</head>
<body>
    <h1>Hello, {{ username | e }}</h1>
</body>
</html>

在此示例中,Jinja2 中的 ‘| e ’过滤器会转义 username 变量中任何潜在的危险字符。

3. 跨站点请求伪造 (CSRF)

CSRF 是一种攻击,它迫使最终用户在他们当前经过身份验证的 Web 应用程序上执行不必要的操作。

解决方案:使用 CSRF 令牌。

演示:在 FastAPI 中实现 CSRF 保护

from fastapi import FastAPI, Request, Form
from fastapi.responses import HTMLResponse
from fastapi.security import OAuth2PasswordBearer


app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


@app.get("/", response_class=HTMLResponse)
def get_form():
    return """
        <form action="/submit" method="post">
            <input type="hidden" name="csrf_token" value="12345678"/>
            <input type="submit" value="Submit"/>
        </form>
    """


@app.post("/submit")
def submit_form(csrf_token: str = Form(...), token: str = Depends(oauth2_scheme)):
    if csrf_token != "12345678":
        raise HTTPException(status_code=400, detail="Invalid CSRF token")
    return {"message": "Form submitted successfully"}

4. 不安全的反序列化

不安全的反序列化是一种漏洞,当使用不受信任的数据滥用应用程序的逻辑时就会发生这种情况。

解决方案:使用安全的库进行反序列化并始终验证数据。

演示:使用 Pydantic 进行安全反序列化

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, ValidationError


app = FastAPI()


class Item(BaseModel):
    name: str
    price: float


@app.post("/items/")
def create_item(item: Item):
    try:
        item = Item(**item.dict())
    except ValidationError as e:
        raise HTTPException(status_code=400, detail="Invalid data")
    return item

5. 日志记录和监控不足

日志记录和监控不足可能会使攻击者在利用漏洞后不被发现。

解决方案:实施适当的日志记录和监控机制。

演示:在 FastAPI 中使用日志记录

import logging
from fastapi import FastAPI, Request


app = FastAPI()


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@app.middleware("http")
async def log_requests(request: Request, call_next):
    logger.info(f"Request: {request.method} {request.url}")
    response = await call_next(request)
    logger.info(f"Response status: {response.status_code}")
    return response

在此示例中,我们使用中间件来记录每个请求和响应。

以下是有关如何防范 FastAPI 应用程序中常见漏洞的其他子主题和演示(其实,看过小编之前写的文章的同学,可以看出来有点重复了。所以这里就简单介绍下,想了解详细内容的话,建议关注小编,通过合集查看之前的安全类文章吧):

1. 安全标头

安全标头可以保护您的应用程序免受各种攻击,包括 XSS、点击劫持和 MIME 类型嗅探。

演示:在 FastAPI 中设置安全标头

from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware


app = FastAPI()


@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    return response

2. 速率限制

速率限制通过限制用户在给定时间内可以发出的请求数量来帮助保护您的应用程序免受暴力攻击和滥用期间。

演示:在 FastAPI 中实现速率限制

from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import time


app = FastAPI()


class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, rate: int, burst: int):
        super().__init__(app)
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.timestamp = time.monotonic()


    async def dispatch(self, request: Request, call_next):
        now = time.monotonic()
        time_passed = now - self.timestamp
        self.timestamp = now
        self.tokens += time_passed * self.rate


        if self.tokens > self.burst:
            self.tokens = self.burst


        if self.tokens < 1:
            raise HTTPException(status_code=429, detail="Too Many Requests")


        self.tokens -= 1
        response = await call_next(request)
        return response


app.add_middleware(RateLimitMiddleware, rate=1, burst=5)


@app.get("/")
async def read_root():
    return {"message": "Hello, world!"}

3. 安全密码存储

安全存储密码有助于保护用户数据,即使您的数据库受到威胁。

演示:在 FastAPI 中使用 bcrypt 进行安全密码哈希处理

from fastapi import FastAPI, HTTPException, Depends
from passlib.context import CryptContext
from pydantic import BaseModel


app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class User(BaseModel):
    username: str
    password: str


fake_db = {}


def hash_password(password: str):
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str):
    return pwd_context.verify(plain_password, hashed_password)


@app.post("/register/")
def register_user(user: User):
    if user.username in fake_db:
        raise HTTPException(status_code=400, detail="Username already registered")
    hashed_password = hash_password(user.password)
    fake_db[user.username] = hashed_password
    return {"message": "User registered successfully"}


@app.post("/login/")
def login_user(user: User):
    hashed_password = fake_db.get(user.username)
    if not hashed_password or not verify_password(user.password, hashed_password):
        raise HTTPException(status_code=400, detail="Invalid credentials")
    return {"message": "Login successful"}

4. 使用 HTTPS

HTTPS 通过加密传输的数据来确保客户端和服务器之间的安全通信。

演示:使用 uvicorn 通过 HTTPS 运行 FastAPI

# 首先,生成 SSL 证书(出于演示目的,自签名)
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365


# 然后,通过 HTTPS 运行 FastAPI
uvicorn myapp:app --host 0.0.0.0 --port 8000 --ssl-keyfile=key.pem --ssl-certfile=cert.pem

5. 访问控制

访问控制可确保用户只能访问他们有权访问的资源。

演示:FastAPI 中的基于角色的访问控制 (RBAC)

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel


app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


fake_users_db = {
    "user1": {"username": "user1", "password": "password1", "role": "user"},
    "admin1": {"username": "admin1", "password": "password1", "role": "admin"},
}


class User(BaseModel):
    username: str
    role: str


def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_users_db.get(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid authentication credentials")
    return User(username=user["username"], role=user["role"])


def admin_required(user: User = Depends(get_current_user)):
    if user.role != "admin":
        raise HTTPException(status_code=403, detail="Admin privileges required")


@app.get("/admin/")
def read_admin_data(user: User = Depends(admin_required)):
    return {"admin_data": "This is admin data"}


@app.get("/user/")
def read_user_data(user: User = Depends(get_current_user)):
    return {"user_data": "This is user data"}

通过实施这些做法并利用提供的演示,您可以显著增强 FastAPI 应用程序的安全性并保护它们免受常见漏洞的侵害。

保护您的 FastAPI 应用程序涉及了解和缓解各种漏洞。通过使用参数化查询、转义用户输入、实施 CSRF 保护、安全地反序列化数据以及确保正确的日志记录和监控,您可以保护您的应用程序免受常见攻击。始终了解最新的安全实践和库,以确保您的应用程序安全。

写在最后:

  1. 安全是一个持久的过程,关注小编不迷路。也欢迎你留言和小编一起探讨安全话题

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表