200字
Python 异步 Web 开发指南:FastAPI 与异步生态深度实践
2025-11-07
2025-11-07

上一篇文章开头我列出了三个常见的 Python Web 服务框架,这篇文章我决定以 FastAPI 为引子仔细的讲一下Python 的异步编程。

异步编程基础与核心理念

理解异步编程模型

异步编程是一种非阻塞的编程范式,特别适合处理I/O密集型任务。与传统同步编程相比,异步模型能够在等待I/O操作完成时释放CPU资源,从而显著提升应用程序的并发处理能力。

同步与异步对比示例

# 同步方式 - 顺序执行,阻塞等待
import time

def sync_fetch(url):
    print(f"开始请求 {url}")
    time.sleep(2)  # 模拟网络请求
    print(f"完成请求 {url}")
    return f"响应来自 {url}"

# 执行多个请求(总耗时约6秒)
start = time.time()
results = [sync_fetch(f"https://example.com/{i}") for i in range(3)]
print(f"同步总耗时: {time.time() - start:.2f}秒")
# 异步方式 - 并发执行,非阻塞
import asyncio

async def async_fetch(url):
    print(f"开始请求 {url}")
    await asyncio.sleep(2)  # 异步等待
    print(f"完成请求 {url}")
    return f"响应来自 {url}"

async def main():
    # 并发执行多个请求(总耗时约2秒)
    start = time.time()
    tasks = [async_fetch(f"https://example.com/{i}") for i in range(3)]
    results = await asyncio.gather(*tasks)
    print(f"异步总耗时: {time.time() - start:.2f}秒")

# 运行异步程序
asyncio.run(main())

事件循环机制解析

事件循环是异步编程的核心引擎,负责调度和执行异步任务。理解其工作原理对于编写高效的异步代码至关重要。

import asyncio
import uvloop

# 使用uvloop替代默认事件循环(性能提升)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def understanding_event_loop():
    print("1. 任务进入事件循环")
  
    # 创建多个异步任务
    task1 = asyncio.create_task(asyncio.sleep(1, "任务1完成"))
    task2 = asyncio.create_task(asyncio.sleep(2, "任务2完成"))
    task3 = asyncio.create_task(asyncio.sleep(0.5, "任务3完成"))
  
    # 任务按照完成顺序返回
    for completed_task in asyncio.as_completed([task1, task2, task3]):
        result = await completed_task
        print(f"完成: {result}")
  
    print("所有任务执行完毕")

# 运行示例
asyncio.run(understanding_event_loop())

FastAPI框架深度解析

现代化API开发体验

FastAPI基于Python类型提示,提供自动化的API文档生成、数据验证和序列化功能。

基础应用搭建

from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr, validator
from typing import List, Optional
from datetime import datetime

# 初始化应用
app = FastAPI(
    title="异步API服务",
    description="基于FastAPI构建的高性能Web服务",
    version="1.0.0"
)

# 数据模型定义
class UserCreate(BaseModel):
    username: str
    email: EmailStr
    password: str
  
    @validator('username')
    def username_alphanumeric(cls, v):
        if not v.isalnum():
            raise ValueError('用户名必须为字母数字组合')
        return v
  
    @validator('password')
    def password_strength(cls, v):
        if len(v) < 8:
            raise ValueError('密码长度至少8位')
        return v

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    created_at: datetime
  
    class Config:
        orm_mode = True

# 基础路由示例
@app.get("/")
async def root():
    return {"message": "欢迎使用FastAPI", "timestamp": datetime.utcnow()}

@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
    # 模拟用户创建逻辑
    user_data = {
        "id": 1,
        "username": user.username,
        "email": user.email,
        "created_at": datetime.utcnow()
    }
    return user_data

@app.get("/users/{user_id}", response_model=UserResponse)
async def read_user(user_id: int, q: Optional[str] = None):
    if user_id > 100:
        raise HTTPException(status_code=404, detail="用户不存在")
  
    return {
        "id": user_id,
        "username": f"user_{user_id}",
        "email": f"user{user_id}@example.com",
        "created_at": datetime.utcnow()
    }

依赖注入系统

FastAPI的依赖注入系统让代码更加模块化和可测试。

from fastapi import Depends, Header, Query
import jwt
from jwt.exceptions import InvalidTokenError

# 模拟数据库
fake_users_db = {
    "user1": {"username": "user1", "email": "user1@example.com", "disabled": False}
}

# 依赖项定义
async def get_token_header(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Invalid authentication scheme")
  
    token = authorization.replace("Bearer ", "")
    try:
        payload = jwt.decode(token, "secret", algorithms=["HS256"])
        return payload
    except InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

async def get_current_user(token_payload: dict = Depends(get_token_header)):
    username = token_payload.get("sub")
    if username not in fake_users_db:
        raise HTTPException(status_code=404, detail="User not found")
  
    user = fake_users_db[username]
    if user.get("disabled"):
        raise HTTPException(status_code=400, detail="Inactive user")
  
    return user

# 使用依赖项的路由
@app.get("/users/me/", response_model=UserResponse)
async def read_users_me(current_user: dict = Depends(get_current_user)):
    return current_user

# 可重用的查询参数依赖
async def common_parameters(
    skip: int = Query(0, ge=0, description="跳过记录数"),
    limit: int = Query(100, ge=1, le=1000, description="返回记录数")
):
    return {"skip": skip, "limit": limit}

@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
    return {
        "skip": commons["skip"],
        "limit": commons["limit"],
        "items": [f"item_{i}" for i in range(commons["skip"], commons["skip"] + commons["limit"])]
    }

异步数据库集成

SQLAlchemy异步操作

使用SQLAlchemy 1.4+的异步支持进行数据库操作。

异步数据库配置

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime, func

# 数据库配置
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

# 创建异步引擎
engine = create_async_engine(
    DATABASE_URL,
    echo=True,  # 开发时显示SQL语句
    pool_size=20,
    max_overflow=0
)

# 异步会话工厂
AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

Base = declarative_base()

# 异步数据模型
class User(Base):
    __tablename__ = "users"
  
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String(50), unique=True, index=True)
    email = Column(String(100), unique=True, index=True)
    hashed_password = Column(String(100))
    created_at = Column(DateTime, server_default=func.now())

# 数据库依赖
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()

# 异步CRUD操作
from sqlalchemy.future import select
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class UserCRUD:
    @staticmethod
    def verify_password(plain_password, hashed_password):
        return pwd_context.verify(plain_password, hashed_password)
  
    @staticmethod
    def get_password_hash(password):
        return pwd_context.hash(password)
  
    @staticmethod
    async def get_user_by_email(db: AsyncSession, email: str):
        result = await db.execute(select(User).where(User.email == email))
        return result.scalar_one_or_none()
  
    @staticmethod
    async def create_user(db: AsyncSession, user_data: UserCreate):
        hashed_password = UserCRUD.get_password_hash(user_data.password)
        db_user = User(
            username=user_data.username,
            email=user_data.email,
            hashed_password=hashed_password
        )
        db.add(db_user)
        await db.commit()
        await db.refresh(db_user)
        return db_user

# 使用异步CRUD的路由
@app.post("/async-users/", response_model=UserResponse)
async def create_async_user(
    user: UserCreate,
    db: AsyncSession = Depends(get_db)
):
    # 检查用户是否已存在
    existing_user = await UserCRUD.get_user_by_email(db, user.email)
    if existing_user:
        raise HTTPException(
            status_code=400,
            detail="邮箱已被注册"
        )
  
    # 创建新用户
    db_user = await UserCRUD.create_user(db, user)
    return db_user

Redis异步缓存集成

import redis.asyncio as redis
from fastapi import BackgroundTasks

# Redis连接池
redis_pool = redis.ConnectionPool.from_url(
    "redis://localhost:6379",
    decode_responses=True
)

async def get_redis():
    return redis.Redis(connection_pool=redis_pool)

# 缓存装饰器
def cache_response(ttl: int = 300):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            redis_client = await get_redis()
            cache_key = f"cache:{func.__name__}:{str(args)}:{str(kwargs)}"
        
            # 尝试从缓存获取
            cached_data = await redis_client.get(cache_key)
            if cached_data:
                return cached_data
        
            # 执行原函数
            result = await func(*args, **kwargs)
        
            # 异步缓存结果
            await redis_client.setex(cache_key, ttl, str(result))
            return result
        return wrapper
    return decorator

# 使用缓存的接口
@app.get("/cached-data/")
@cache_response(ttl=60)  # 缓存60秒
async def get_cached_data():
    # 模拟耗时操作
    await asyncio.sleep(2)
    return {"data": "这是缓存的数据", "timestamp": datetime.utcnow()}

WebSocket实时通信

双向实时通信实现

from fastapi import WebSocket, WebSocketDisconnect
from typing import List

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
  
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
  
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
  
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
  
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            try:
                await connection.send_text(message)
            except Exception:
                # 处理断开连接的客户端
                self.disconnect(connection)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            # 接收客户端消息
            data = await websocket.receive_text()
        
            # 处理消息并广播
            response = f"客户端{client_id}说: {data}"
            await manager.broadcast(response)
        
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"客户端{client_id}已断开连接")

# 实时数据推送示例
import json
import asyncio

@app.websocket("/ws/stock/{symbol}")
async def stock_updates(websocket: WebSocket, symbol: str):
    await websocket.accept()
    try:
        while True:
            # 模拟实时股票数据
            import random
            price_data = {
                "symbol": symbol,
                "price": round(100 + random.uniform(-5, 5), 2),
                "timestamp": datetime.utcnow().isoformat(),
                "volume": random.randint(1000, 10000)
            }
        
            await websocket.send_json(price_data)
            await asyncio.sleep(1)  # 每秒推送一次
        
    except WebSocketDisconnect:
        print(f"股票 {symbol} 的WebSocket连接已关闭")

高性能中间件与背景任务

自定义中间件开发

from fastapi import Request
import time
import uuid

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    # 请求唯一标识
    request_id = str(uuid.uuid4())
    request.state.request_id = request_id
  
    start_time = time.time()
  
    # 处理请求
    response = await call_next(request)
  
    # 计算处理时间
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    response.headers["X-Request-ID"] = request_id
  
    # 记录日志
    print(f"请求 {request.method} {request.url} 处理耗时: {process_time:.4f}秒")
  
    return response

# 速率限制中间件
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/limited/")
@limiter.limit("5/minute")
async def limited_endpoint(request: Request):
    return {"message": "这是一个速率受限的接口"}

# 背景任务处理
from fastapi import BackgroundTasks

async def send_notification(email: str, message: str):
    # 模拟发送邮件耗时
    await asyncio.sleep(2)
    print(f"已发送通知到 {email}: {message}")

@app.post("/notify/{email}")
async def send_notification_endpoint(
    email: str,
    message: str,
    background_tasks: BackgroundTasks
):
    background_tasks.add_task(send_notification, email, message)
    return {"message": "通知已加入后台任务队列"}

测试与部署策略

异步测试框架

import pytest
from httpx import AsyncClient
from .main import app

@pytest.fixture
async def async_client():
    async with AsyncClient(app=app, base_url="http://test") as client:
        yield client

@pytest.mark.asyncio
async def test_root_endpoint(async_client):
    response = await async_client.get("/")
    assert response.status_code == 200
    data = response.json()
    assert "message" in data
    assert "timestamp" in data

@pytest.mark.asyncio
async def test_user_creation(async_client):
    user_data = {
        "username": "testuser",
        "email": "test@example.com",
        "password": "securepassword123"
    }
  
    response = await async_client.post("/users/", json=user_data)
    assert response.status_code == 200
  
    user_response = response.json()
    assert user_response["username"] == user_data["username"]
    assert user_response["email"] == user_data["email"]
    assert "id" in user_response

# 数据库测试夹具
@pytest.fixture
async def test_db():
    # 创建测试数据库
    test_engine = create_async_engine("sqlite+aiosqlite:///./test.db")
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
  
    async_session = sessionmaker(
        test_engine, class_=AsyncSession, expire_on_commit=False
    )
  
    yield async_session
  
    # 清理测试数据库
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)

生产环境部署

Docker配置

FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# 安装uvicorn和gunicorn
RUN pip install uvicorn[standard] gunicorn

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["gunicorn", "main:app", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]

性能监控配置

from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response

# 定义指标
REQUEST_COUNT = Counter('request_count', 'App Request Count', ['method', 'endpoint'])
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency', ['method', 'endpoint'])

@app.middleware("http")
async def monitor_requests(request: Request, call_next):
    start_time = time.time()
  
    response = await call_next(request)
  
    # 记录指标
    REQUEST_COUNT.labels(method=request.method, endpoint=request.url.path).inc()
    REQUEST_LATENCY.labels(method=request.method, endpoint=request.url.path).observe(
        time.time() - start_time
    )
  
    return response

@app.get("/metrics")
async def metrics():
    return Response(generate_latest())

最佳实践与性能优化

连接池管理

from databases import Database

# 使用databases库进行连接池管理
database = Database(DATABASE_URL)

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

# 使用连接池的查询
@app.get("/optimized-users/")
async def get_optimized_users(skip: int = 0, limit: int = 100):
    query = "SELECT * FROM users ORDER BY id LIMIT :limit OFFSET :skip"
    users = await database.fetch_all(query, values={"limit": limit, "skip": skip})
    return users

异步任务队列集成

import asyncio
from concurrent.futures import ThreadPoolExecutor

# 创建线程池处理CPU密集型任务
thread_pool = ThreadPoolExecutor(max_workers=4)

async def run_cpu_intensive_task(data):
    loop = asyncio.get_event_loop()
  
    # 将CPU密集型任务放到线程池执行
    result = await loop.run_in_executor(
        thread_pool, 
        lambda: cpu_intensive_processing(data)
    )
    return result

def cpu_intensive_processing(data):
    # 模拟CPU密集型计算
    import hashlib
    return hashlib.sha256(data.encode()).hexdigest()

@app.post("/process-data/")
async def process_data_endpoint(data: str):
    result = await run_cpu_intensive_task(data)
    return {"hash": result}

这个完整的FastAPI异步开发指南涵盖了从基础概念到生产部署的全方位内容。通过合理运用异步编程模式、连接池管理、缓存策略和实时通信技术,可以构建出高性能、可扩展的现代Web应用程序。在实际项目中,建议根据具体需求选择合适的组件和优化策略,持续监控和调整系统性能。

Python 异步 Web 开发指南:FastAPI 与异步生态深度实践
作者
YeiJ
发表于
2025-11-07
License
CC BY-NC-SA 4.0

评论