首页 > Python基础教程 >
-
FastAPI中的依赖注入与数据库事务管理
探索数千个预构建的 AI 应用,开启你的下一个伟大创意
依赖注入基础与数据库会话封装
(代码示例运行环境:Python 3.8+,需安装fastapi, uvicorn, sqlalchemy, asyncpg)
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
# 初始化数据库连接(使用异步引擎)
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# 封装数据库会话依赖
async def get_db() -> AsyncSession:
"""
生成器函数创建数据库会话上下文
使用yield代替return实现资源自动释放
会话自动关闭机制保证连接池回收
"""
async with async_session_maker() as session:
try:
yield session
finally:
await session.close()
app = FastAPI()
@app.post("/users/")
async def create_user(
name: str,
session: AsyncSession = Depends(get_db)
):
"""
路由函数通过Depends自动获取数据库会话
事务管理需要在业务逻辑中显式控制
注意异步await关键字的正确使用
"""
from sqlalchemy import text
try:
# 执行原生SQL示例(实际建议使用ORM)
await session.execute(
text("INSERT INTO users (name) VALUES (:name)"),
{"name": name}
)
await session.commit()
return {"status": "created"}
except Exception as e:
await session.rollback()
raise HTTPException(500, str(e))
事务管理的三种实现模式
(1)自动事务模式(适合简单操作):
from fastapi import Depends
from databases import Database
async def transaction_wrapper(db: Database = Depends(get_db)):
async with db.transaction():
yield
(2)手动控制模式(复杂业务场景):
@app.post("/orders/")
async def create_order(
user_id: int,
db: AsyncSession = Depends(get_db)
):
try:
await db.begin()
# 执行多个数据库操作
await db.commit()
except SQLAlchemyError:
await db.rollback()
raise
(3)装饰器模式(代码复用最佳实践):
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_transaction(db: AsyncSession):
try:
yield
await db.commit()
except Exception:
await db.rollback()
raise
# 在路由中使用
async def create_order(db: AsyncSession = Depends(get_db)):
async with managed_transaction(db):
# 业务逻辑代码
完整案例:用户注册连带创建档案
(包含事务管理和错误处理的最佳实践)
from sqlalchemy import insert
from pydantic import BaseModel
class UserCreate(BaseModel):
username: str
email: str
profile: dict
@app.post("/register/")
async def register_user(
user_data: UserCreate,
db: AsyncSession = Depends(get_db)
):
async with db.begin():
try:
# 插入用户主表
user_result = await db.execute(
insert(users_table).values(
username=user_data.username,
email=user_data.email
).returning(users_table.c.id)
)
user_id = user_result.scalar()
# 插入档案子表
await db.execute(
insert(profiles_table).values(
user_id=user_id,
**user_data.profile
)
)
return {"user_id": user_id}
except IntegrityError as e:
await db.rollback()
if "unique constraint" in str(e):
raise HTTPException(400, "Username already exists")
raise HTTPException(500, "Database error")
课后Quiz:
Q1:使用原生SQL查询时,如何防止SQL注入攻击?
A) 直接拼接字符串
B) 使用参数化查询
C) 过滤特殊字符
D) 使用ORM自动处理
正确答案:B
解析:参数化查询通过将用户输入与SQL语句分离的方式,从根本上阻止注入攻击。示例中的text()
函数配合参数字典即为正确做法。即使用ORM,也需要避免直接拼接查询字符串。
常见报错解决方案:
错误现象:
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions.ConnectionDoesNotExistError'>
原因分析:
数据库连接参数配置错误
连接池耗尽未正确释放
异步上下文管理不当
解决步骤:
检查DATABASE_URL格式:postgresql+asyncpg://
确保数据库服务正常运行
在依赖项中正确使用async with管理会话生命周期
调整连接池设置:
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30
)
本文作者: Amd794
本文链接: https://www.cnblogs.com/Amd794/p/18815765