首页 > Python基础教程 >
-
FastAPI依赖注入与上下文管理
-
全局依赖配置原理与实现
1.1 全局依赖的核心作用
全局依赖是FastAPI实现跨路由通用逻辑的关键机制,其核心作用包括:
统一处理认证鉴权
标准化响应格式
集中收集请求日志
管理数据库会话生命周期
实施统一速率限制
from fastapi import Depends, FastAPI, Header
app = FastAPI()
async def verify_token(authorization: str = Header(...)):
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401)
return authorization[7:]
app = FastAPI(dependencies=[Depends(verify_token)])
1.2 多层级依赖配置
FastAPI支持灵活的依赖注入层级:
层级类型 作用范围 典型应用场景
全局依赖 所有路由 身份认证、请求日志
路由组依赖 指定路由组 API版本控制、权限分级
单路由依赖 单个路由 特殊参数校验、业务级权限
1.3 数据库会话实战案例
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/db"
engine = create_async_engine(DATABASE_URL)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用启动时执行
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# 应用关闭时执行
await engine.dispose()
async def get_db():
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
app = FastAPI(lifespan=lifespan, dependencies=[Depends(get_db)])
-
应用生命周期管理
2.1 生命周期事件实战
from fastapi import FastAPI
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动时初始化Redis连接池
app.state.redis = await create_redis_pool()
yield
# 关闭时释放资源
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
2.2 全局状态管理
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
# 记录到全局状态
request.app.state.request_count += 1
return response
- 综合应用案例:电商系统架构
from fastapi import APIRouter, Depends
from pydantic import BaseModel
class ProductCreate(BaseModel):
name: str
price: float
stock: int
router = APIRouter(prefix="/products")
@router.post("")
async def create_product(
product_data: ProductCreate,
db: AsyncSession = Depends(get_db),
redis=Depends(get_redis)
):
# 检查商品名称重复
existing = await db.execute(
select(Product).filter(Product.name == product_data.name)
)
if existing.scalar():
raise HTTPException(400, "Product name exists")
# 写入数据库
new_product = Product(**product_data.dict())
db.add(new_product)
await db.commit()
# 更新缓存
await redis.delete("product_list")
return {"id": new_product.id}
课后Quiz
Q1:当遇到数据库连接池耗尽问题时,应该如何排查?
A. 检查数据库服务器状态
B. 增加连接池最大连接数
C. 检查是否忘记释放会话
D. 所有以上选项
正确答案:D。连接池问题需要综合排查,包括服务器资源、配置参数和代码逻辑。
Q2:为什么推荐使用yield方式管理数据库会话?
A. 实现事务的自动提交
B. 确保异常时回滚事务
C. 自动关闭会话连接
D. 所有以上选项
正确答案:D。yield语法可以完美实现会话的生命周期管理。
常见报错解决方案
错误1:RuntimeError: No response returned.
原因:依赖项中未正确返回响应
解决:
async def auth_dependency():
try:
# 验证逻辑
yield
except Exception as e:
return JSONResponse(status_code=401, content={"error": str(e)})
错误2:sqlalchemy.exc.InterfaceError: Connection closed unexpectedly
原因:数据库连接超时
预防:
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30
)
错误3:pydantic.error_wrappers.ValidationError
原因:请求体数据验证失败
排查步骤:
检查请求头Content-Type是否正确
验证请求体JSON格式
检查Pydantic模型定义
使用curl测试请求:
curl -X POST http://localhost:8000/items \
-H "Content-Type: application/json" \
-d '{"name":"example", "price": 9.99}'
来源:https://www.cnblogs.com/Amd794/p/18811963