上一篇文章开头我列出了三个常见的 Python Web 服务框架,这篇文章我决定以 FastAPI 为引子仔细的讲一下Python 的异步编程。
异步编程基础与核心理念
理解异步编程模型
异步编程是一种非阻塞的编程范式,特别适合处理I/O密集型任务。与传统同步编程相比,异步模型能够在等待I/O操作完成时释放CPU资源,从而显著提升应用程序的并发处理能力。
同步与异步对比示例:
# 同步方式 - 顺序执行,阻塞等待
import time
def sync_fetch(url):
print(f"开始请求 {url}")
time.sleep(2) # 模拟网络请求
print(f"完成请求 {url}")
return f"响应来自 {url}"
# 执行多个请求(总耗时约6秒)
start = time.time()
results = [sync_fetch(f"https://example.com/{i}") for i in range(3)]
print(f"同步总耗时: {time.time() - start:.2f}秒")
# 异步方式 - 并发执行,非阻塞
import asyncio
async def async_fetch(url):
print(f"开始请求 {url}")
await asyncio.sleep(2) # 异步等待
print(f"完成请求 {url}")
return f"响应来自 {url}"
async def main():
# 并发执行多个请求(总耗时约2秒)
start = time.time()
tasks = [async_fetch(f"https://example.com/{i}") for i in range(3)]
results = await asyncio.gather(*tasks)
print(f"异步总耗时: {time.time() - start:.2f}秒")
# 运行异步程序
asyncio.run(main())
事件循环机制解析
事件循环是异步编程的核心引擎,负责调度和执行异步任务。理解其工作原理对于编写高效的异步代码至关重要。
import asyncio
import uvloop
# 使用uvloop替代默认事件循环(性能提升)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def understanding_event_loop():
print("1. 任务进入事件循环")
# 创建多个异步任务
task1 = asyncio.create_task(asyncio.sleep(1, "任务1完成"))
task2 = asyncio.create_task(asyncio.sleep(2, "任务2完成"))
task3 = asyncio.create_task(asyncio.sleep(0.5, "任务3完成"))
# 任务按照完成顺序返回
for completed_task in asyncio.as_completed([task1, task2, task3]):
result = await completed_task
print(f"完成: {result}")
print("所有任务执行完毕")
# 运行示例
asyncio.run(understanding_event_loop())
FastAPI框架深度解析
现代化API开发体验
FastAPI基于Python类型提示,提供自动化的API文档生成、数据验证和序列化功能。
基础应用搭建:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, EmailStr, validator
from typing import List, Optional
from datetime import datetime
# 初始化应用
app = FastAPI(
title="异步API服务",
description="基于FastAPI构建的高性能Web服务",
version="1.0.0"
)
# 数据模型定义
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
@validator('username')
def username_alphanumeric(cls, v):
if not v.isalnum():
raise ValueError('用户名必须为字母数字组合')
return v
@validator('password')
def password_strength(cls, v):
if len(v) < 8:
raise ValueError('密码长度至少8位')
return v
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: datetime
class Config:
orm_mode = True
# 基础路由示例
@app.get("/")
async def root():
return {"message": "欢迎使用FastAPI", "timestamp": datetime.utcnow()}
@app.post("/users/", response_model=UserResponse)
async def create_user(user: UserCreate):
# 模拟用户创建逻辑
user_data = {
"id": 1,
"username": user.username,
"email": user.email,
"created_at": datetime.utcnow()
}
return user_data
@app.get("/users/{user_id}", response_model=UserResponse)
async def read_user(user_id: int, q: Optional[str] = None):
if user_id > 100:
raise HTTPException(status_code=404, detail="用户不存在")
return {
"id": user_id,
"username": f"user_{user_id}",
"email": f"user{user_id}@example.com",
"created_at": datetime.utcnow()
}
依赖注入系统
FastAPI的依赖注入系统让代码更加模块化和可测试。
from fastapi import Depends, Header, Query
import jwt
from jwt.exceptions import InvalidTokenError
# 模拟数据库
fake_users_db = {
"user1": {"username": "user1", "email": "user1@example.com", "disabled": False}
}
# 依赖项定义
async def get_token_header(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authentication scheme")
token = authorization.replace("Bearer ", "")
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload
except InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
async def get_current_user(token_payload: dict = Depends(get_token_header)):
username = token_payload.get("sub")
if username not in fake_users_db:
raise HTTPException(status_code=404, detail="User not found")
user = fake_users_db[username]
if user.get("disabled"):
raise HTTPException(status_code=400, detail="Inactive user")
return user
# 使用依赖项的路由
@app.get("/users/me/", response_model=UserResponse)
async def read_users_me(current_user: dict = Depends(get_current_user)):
return current_user
# 可重用的查询参数依赖
async def common_parameters(
skip: int = Query(0, ge=0, description="跳过记录数"),
limit: int = Query(100, ge=1, le=1000, description="返回记录数")
):
return {"skip": skip, "limit": limit}
@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
return {
"skip": commons["skip"],
"limit": commons["limit"],
"items": [f"item_{i}" for i in range(commons["skip"], commons["skip"] + commons["limit"])]
}
异步数据库集成
SQLAlchemy异步操作
使用SQLAlchemy 1.4+的异步支持进行数据库操作。
异步数据库配置:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime, func
# 数据库配置
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# 创建异步引擎
engine = create_async_engine(
DATABASE_URL,
echo=True, # 开发时显示SQL语句
pool_size=20,
max_overflow=0
)
# 异步会话工厂
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()
# 异步数据模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True)
email = Column(String(100), unique=True, index=True)
hashed_password = Column(String(100))
created_at = Column(DateTime, server_default=func.now())
# 数据库依赖
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
# 异步CRUD操作
from sqlalchemy.future import select
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class UserCRUD:
@staticmethod
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def get_password_hash(password):
return pwd_context.hash(password)
@staticmethod
async def get_user_by_email(db: AsyncSession, email: str):
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
@staticmethod
async def create_user(db: AsyncSession, user_data: UserCreate):
hashed_password = UserCRUD.get_password_hash(user_data.password)
db_user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
# 使用异步CRUD的路由
@app.post("/async-users/", response_model=UserResponse)
async def create_async_user(
user: UserCreate,
db: AsyncSession = Depends(get_db)
):
# 检查用户是否已存在
existing_user = await UserCRUD.get_user_by_email(db, user.email)
if existing_user:
raise HTTPException(
status_code=400,
detail="邮箱已被注册"
)
# 创建新用户
db_user = await UserCRUD.create_user(db, user)
return db_user
Redis异步缓存集成
import redis.asyncio as redis
from fastapi import BackgroundTasks
# Redis连接池
redis_pool = redis.ConnectionPool.from_url(
"redis://localhost:6379",
decode_responses=True
)
async def get_redis():
return redis.Redis(connection_pool=redis_pool)
# 缓存装饰器
def cache_response(ttl: int = 300):
def decorator(func):
async def wrapper(*args, **kwargs):
redis_client = await get_redis()
cache_key = f"cache:{func.__name__}:{str(args)}:{str(kwargs)}"
# 尝试从缓存获取
cached_data = await redis_client.get(cache_key)
if cached_data:
return cached_data
# 执行原函数
result = await func(*args, **kwargs)
# 异步缓存结果
await redis_client.setex(cache_key, ttl, str(result))
return result
return wrapper
return decorator
# 使用缓存的接口
@app.get("/cached-data/")
@cache_response(ttl=60) # 缓存60秒
async def get_cached_data():
# 模拟耗时操作
await asyncio.sleep(2)
return {"data": "这是缓存的数据", "timestamp": datetime.utcnow()}
WebSocket实时通信
双向实时通信实现
from fastapi import WebSocket, WebSocketDisconnect
from typing import List
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception:
# 处理断开连接的客户端
self.disconnect(connection)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
# 接收客户端消息
data = await websocket.receive_text()
# 处理消息并广播
response = f"客户端{client_id}说: {data}"
await manager.broadcast(response)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"客户端{client_id}已断开连接")
# 实时数据推送示例
import json
import asyncio
@app.websocket("/ws/stock/{symbol}")
async def stock_updates(websocket: WebSocket, symbol: str):
await websocket.accept()
try:
while True:
# 模拟实时股票数据
import random
price_data = {
"symbol": symbol,
"price": round(100 + random.uniform(-5, 5), 2),
"timestamp": datetime.utcnow().isoformat(),
"volume": random.randint(1000, 10000)
}
await websocket.send_json(price_data)
await asyncio.sleep(1) # 每秒推送一次
except WebSocketDisconnect:
print(f"股票 {symbol} 的WebSocket连接已关闭")
高性能中间件与背景任务
自定义中间件开发
from fastapi import Request
import time
import uuid
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
# 请求唯一标识
request_id = str(uuid.uuid4())
request.state.request_id = request_id
start_time = time.time()
# 处理请求
response = await call_next(request)
# 计算处理时间
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
response.headers["X-Request-ID"] = request_id
# 记录日志
print(f"请求 {request.method} {request.url} 处理耗时: {process_time:.4f}秒")
return response
# 速率限制中间件
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/limited/")
@limiter.limit("5/minute")
async def limited_endpoint(request: Request):
return {"message": "这是一个速率受限的接口"}
# 背景任务处理
from fastapi import BackgroundTasks
async def send_notification(email: str, message: str):
# 模拟发送邮件耗时
await asyncio.sleep(2)
print(f"已发送通知到 {email}: {message}")
@app.post("/notify/{email}")
async def send_notification_endpoint(
email: str,
message: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_notification, email, message)
return {"message": "通知已加入后台任务队列"}
测试与部署策略
异步测试框架
import pytest
from httpx import AsyncClient
from .main import app
@pytest.fixture
async def async_client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
@pytest.mark.asyncio
async def test_root_endpoint(async_client):
response = await async_client.get("/")
assert response.status_code == 200
data = response.json()
assert "message" in data
assert "timestamp" in data
@pytest.mark.asyncio
async def test_user_creation(async_client):
user_data = {
"username": "testuser",
"email": "test@example.com",
"password": "securepassword123"
}
response = await async_client.post("/users/", json=user_data)
assert response.status_code == 200
user_response = response.json()
assert user_response["username"] == user_data["username"]
assert user_response["email"] == user_data["email"]
assert "id" in user_response
# 数据库测试夹具
@pytest.fixture
async def test_db():
# 创建测试数据库
test_engine = create_async_engine("sqlite+aiosqlite:///./test.db")
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async_session = sessionmaker(
test_engine, class_=AsyncSession, expire_on_commit=False
)
yield async_session
# 清理测试数据库
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
生产环境部署
Docker配置:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 安装uvicorn和gunicorn
RUN pip install uvicorn[standard] gunicorn
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["gunicorn", "main:app", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
性能监控配置:
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Response
# 定义指标
REQUEST_COUNT = Counter('request_count', 'App Request Count', ['method', 'endpoint'])
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency', ['method', 'endpoint'])
@app.middleware("http")
async def monitor_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
# 记录指标
REQUEST_COUNT.labels(method=request.method, endpoint=request.url.path).inc()
REQUEST_LATENCY.labels(method=request.method, endpoint=request.url.path).observe(
time.time() - start_time
)
return response
@app.get("/metrics")
async def metrics():
return Response(generate_latest())
最佳实践与性能优化
连接池管理
from databases import Database
# 使用databases库进行连接池管理
database = Database(DATABASE_URL)
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
# 使用连接池的查询
@app.get("/optimized-users/")
async def get_optimized_users(skip: int = 0, limit: int = 100):
query = "SELECT * FROM users ORDER BY id LIMIT :limit OFFSET :skip"
users = await database.fetch_all(query, values={"limit": limit, "skip": skip})
return users
异步任务队列集成
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 创建线程池处理CPU密集型任务
thread_pool = ThreadPoolExecutor(max_workers=4)
async def run_cpu_intensive_task(data):
loop = asyncio.get_event_loop()
# 将CPU密集型任务放到线程池执行
result = await loop.run_in_executor(
thread_pool,
lambda: cpu_intensive_processing(data)
)
return result
def cpu_intensive_processing(data):
# 模拟CPU密集型计算
import hashlib
return hashlib.sha256(data.encode()).hexdigest()
@app.post("/process-data/")
async def process_data_endpoint(data: str):
result = await run_cpu_intensive_task(data)
return {"hash": result}
这个完整的FastAPI异步开发指南涵盖了从基础概念到生产部署的全方位内容。通过合理运用异步编程模式、连接池管理、缓存策略和实时通信技术,可以构建出高性能、可扩展的现代Web应用程序。在实际项目中,建议根据具体需求选择合适的组件和优化策略,持续监控和调整系统性能。