SQLAlchemy 2.x 学习笔记
简介
SQLAlchemy 是 Python 中最流行的 ORM(对象关系映射)库,它提供了高效、灵活的数据库操作方式。本文基于 SQLAlchemy 2.x 版本,结合 MySQL 数据库,记录学习过程中的关键知识点和实用技巧。
安装配置
首先安装 SQLAlchemy 和 MySQL 驱动:
pip install sqlalchemy==2.0.23
pip install mysqlclient
为什么需要单独安装MySQL驱动?
SQLAlchemy 本身是一个数据库抽象层和ORM工具,它不包含与具体数据库系统通信的驱动程序。这种设计遵循了关注点分离的原则,带来以下优势:
- 灵活性:用户可以根据需求选择不同的数据库驱动,如
mysqlclient
、PyMySQL
或mysql-connector-python
- 维护效率:SQLAlchemy团队可以专注于ORM功能开发,而不必维护所有数据库的连接驱动
- 版本独立性:数据库驱动可以独立升级,不受SQLAlchemy版本限制
- 体积优化:避免安装不需要的驱动,减小项目依赖体积
对于MySQL,常用的驱动选择有:
- mysqlclient:C扩展驱动,性能最佳,推荐生产环境使用
- PyMySQL:纯Python实现,安装简单但性能较低
- mysql-connector-python:Oracle官方驱动,纯Python实现
在连接字符串中,驱动名称通过 dialect+driver
格式指定,如 mysql+mysqlclient
、mysql+pymysql
等。
MySQL驱动性能比较
不同驱动在性能上存在显著差异,以下是一个简要比较:
驱动 | 类型 | 性能 | 安装难度 | 适用场景 |
---|---|---|---|---|
mysqlclient | C扩展 | 最佳 | 中等(需要C编译器) | 生产环境,高性能要求 |
PyMySQL | 纯Python | 中等 | 简单 | 开发环境,简单应用 |
mysql-connector-python | 纯Python | 较低 | 简单 | 需要官方支持的场景 |
常见安装问题
mysqlclient安装问题:
在Windows上安装mysqlclient可能遇到编译错误,解决方法:
# 使用预编译的二进制wheel包
pip install mysqlclient --only-binary :all:
或者在Linux上:
# Ubuntu/Debian
sudo apt-get install python3-dev default-libmysqlclient-dev build-essential
# CentOS/RHEL
sudo yum install python3-devel mysql-devel gcc
数据库连接
SQLAlchemy 2.x 使用新的连接方式,推荐使用 URL 对象创建引擎:
from sqlalchemy import create_engine, URL
from sqlalchemy.orm import sessionmaker
# 创建连接 URL(使用mysqlclient驱动)
url_object = URL.create(
"mysql+mysqldb", # 指定数据库类型和驱动名称
username="user",
password="password",
host="localhost",
port=3306,
database="testdb",
)
# 创建引擎
engine = create_engine(url_object, echo=True)
# 创建会话工厂
Session = sessionmaker(bind=engine)
也可以使用连接字符串:
# 使用mysqlclient驱动(推荐)
engine = create_engine("mysql+mysqldb://user:password@localhost:3306/testdb", echo=True)
# 使用PyMySQL驱动
# engine = create_engine("mysql+pymysql://user:password@localhost:3306/testdb", echo=True)
# 使用mysql-connector-python驱动
# engine = create_engine("mysql+mysqlconnector://user:password@localhost:3306/testdb", echo=True)
参数说明:
echo=True
:输出 SQL 语句,方便调试pool_size
:连接池大小pool_timeout
:连接超时时间pool_recycle
:连接回收时间
驱动特定配置选项
不同的MySQL驱动支持特定的连接参数,可以通过 connect_args
参数传递:
# mysqlclient特定选项
engine = create_engine(
"mysql+mysqldb://user:password@localhost/testdb",
connect_args={
"ssl": {
"ca": "/path/to/ca-cert.pem",
"cert": "/path/to/client-cert.pem",
"key": "/path/to/client-key.pem"
},
"charset": "utf8mb4",
"use_unicode": True,
"autocommit": False
}
)
# PyMySQL特定选项
engine = create_engine(
"mysql+pymysql://user:password@localhost/testdb",
connect_args={
"charset": "utf8mb4",
"read_timeout": 10, # 秒
"write_timeout": 10, # 秒
"ssl_ca": "/path/to/ca-cert.pem"
}
)
声明模型
SQLAlchemy 2.x 推荐使用声明式映射和类型注解:
from typing import List, Optional
from sqlalchemy import String, Integer, ForeignKey, Text, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
# 定义基类
class Base(DeclarativeBase):
pass
# 用户模型
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
email: Mapped[str] = mapped_column(String(100), unique=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
# 关系:一个用户可以有多篇文章
posts: Mapped[List["Post"]] = relationship(back_populates="author")
def __repr__(self) -> str:
return f"<User(username={self.username}, email={self.email})>"
# 文章模型
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
content: Mapped[str] = mapped_column(Text)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
# 关系:多篇文章属于一个用户
author: Mapped["User"] = relationship(back_populates="posts")
def __repr__(self) -> str:
return f"<Post(title={self.title})>"
创建表
# 创建所有表
Base.metadata.create_all(engine)
# 删除所有表
# Base.metadata.drop_all(engine)
会话管理
SQLAlchemy 2.x 推荐使用上下文管理器处理会话:
# 创建会话
with Session() as session:
# 数据库操作
user = User(username="test_user", email="test@example.com")
session.add(user)
session.commit()
基本操作
插入数据
with Session() as session:
# 创建用户
user1 = User(username="alice", email="alice@example.com")
user2 = User(username="bob", email="bob@example.com")
# 添加到会话
session.add_all([user1, user2])
# 提交事务
session.commit()
# 创建文章并关联用户
post1 = Post(
title="SQLAlchemy 入门",
content="这是一篇关于 SQLAlchemy 的入门文章",
user_id=user1.id
)
post2 = Post(
title="Python Web 开发",
content="使用 Python 进行 Web 开发的技巧",
user_id=user1.id
)
session.add_all([post1, post2])
session.commit()
查询数据
SQLAlchemy 2.x 引入了新的查询 API,使用 select
语句:
from sqlalchemy import select
with Session() as session:
# 查询所有用户
stmt = select(User)
users = session.execute(stmt).scalars().all()
for user in users:
print(user)
# 条件查询
stmt = select(User).where(User.username == "alice")
user = session.execute(stmt).scalar_one_or_none()
if user:
print(f"找到用户: {user.username}")
# 排序
stmt = select(Post).order_by(Post.created_at.desc())
posts = session.execute(stmt).scalars().all()
for post in posts:
print(f"{post.title} - {post.created_at}")
# 限制结果数量
stmt = select(Post).limit(5).offset(0)
posts = session.execute(stmt).scalars().all()
# 连接查询
stmt = select(User, Post).join(Post, User.id == Post.user_id)
results = session.execute(stmt).all()
for user, post in results:
print(f"{user.username}: {post.title}")
更新数据
with Session() as session:
# 方法一:查询后修改
user = session.execute(select(User).where(User.username == "alice")).scalar_one()
user.email = "alice_new@example.com"
session.commit()
# 方法二:直接执行更新
from sqlalchemy import update
stmt = update(User).where(User.username == "bob").values(email="bob_new@example.com")
session.execute(stmt)
session.commit()
删除数据
with Session() as session:
# 方法一:查询后删除
post = session.execute(select(Post).where(Post.id == 1)).scalar_one()
session.delete(post)
session.commit()
# 方法二:直接执行删除
from sqlalchemy import delete
stmt = delete(Post).where(Post.id == 2)
session.execute(stmt)
session.commit()
高级查询
聚合函数
from sqlalchemy import func, desc
with Session() as session:
# 计数
stmt = select(func.count()).select_from(User)
count = session.execute(stmt).scalar()
print(f"用户总数: {count}")
# 分组统计
stmt = select(User.id, func.count(Post.id).label("post_count")).\
join(Post).\
group_by(User.id).\
order_by(desc("post_count"))
results = session.execute(stmt).all()
for user_id, post_count in results:
print(f"用户 ID {user_id} 发表了 {post_count} 篇文章")
子查询
with Session() as session:
# 查找发表文章数量最多的用户
subq = select(Post.user_id, func.count(Post.id).label("post_count")).\
group_by(Post.user_id).subquery()
stmt = select(User, subq.c.post_count).\
join(subq, User.id == subq.c.user_id).\
order_by(desc("post_count"))
results = session.execute(stmt).all()
for user, post_count in results:
print(f"{user.username} 发表了 {post_count} 篇文章")
关系和加载策略
关系类型
- 一对多关系(如上面的 User 和 Post)
- 多对一关系(Post 到 User 的反向关系)
- 多对多关系:
# 定义中间表
post_tag = Table(
"post_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True)
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
# 多对多关系
posts: Mapped[List["Post"]] = relationship(secondary=post_tag, back_populates="tags")
# 在 Post 类中添加
class Post(Base):
# ... 其他属性 ...
tags: Mapped[List["Tag"]] = relationship(secondary=post_tag, back_populates="posts")
加载策略
from sqlalchemy.orm import joinedload, selectinload, subqueryload
with Session() as session:
# 懒加载(默认)
user = session.execute(select(User).where(User.id == 1)).scalar_one()
# 访问时才加载关联数据
for post in user.posts:
print(post.title)
# 急加载 - Joined Load
stmt = select(User).options(joinedload(User.posts)).where(User.id == 1)
user = session.execute(stmt).unique().scalar_one()
# 已预加载关联数据
for post in user.posts:
print(post.title)
# SelectIn Load(适合集合)
stmt = select(User).options(selectinload(User.posts))
users = session.execute(stmt).unique().scalars().all()
# Subquery Load
stmt = select(User).options(subqueryload(User.posts))
users = session.execute(stmt).unique().scalars().all()
事务管理
from sqlalchemy.exc import IntegrityError
# 显式事务
with Session() as session:
try:
# 开始事务
user = User(username="transaction_test", email="test@example.com")
session.add(user)
post = Post(title="事务测试", content="测试内容", user_id=user.id)
session.add(post)
# 提交事务
session.commit()
except IntegrityError:
# 回滚事务
session.rollback()
print("事务失败,已回滚")
原生 SQL
with Session() as session:
# 执行原生 SQL 查询
result = session.execute("SELECT * FROM users WHERE username LIKE :pattern",
{"pattern": "%alice%"})
for row in result:
print(row)
# 使用 text 构造 SQL
from sqlalchemy import text
stmt = text("SELECT * FROM posts WHERE title LIKE :title")
result = session.execute(stmt, {"title": "%SQLAlchemy%"})
for row in result:
print(row)
性能优化技巧
- 合理使用加载策略:根据查询需求选择合适的加载策略,避免 N+1 查询问题。
- 批量操作:使用
bulk_insert_mappings
、bulk_update_mappings
和bulk_save_objects
进行批量操作。
with Session() as session:
# 批量插入
users_data = [
{"username": f"user_{i}", "email": f"user_{i}@example.com"}
for i in range(100)
]
session.bulk_insert_mappings(User, users_data)
session.commit()
- 使用索引:在数据库层面创建适当的索引。
from sqlalchemy import Index
# 在模型中定义索引
class User(Base):
__tablename__ = "users"
# ... 属性定义 ...
# 定义索引
__table_args__ = (
Index("idx_username", "username"),
Index("idx_email", "email"),
)
- 使用
yield_per
:处理大量数据时分批获取结果。
with Session() as session:
# 分批处理大量数据
stmt = select(User)
for user in session.scalars(stmt).yield_per(100):
# 每次处理 100 条记录
print(user.username)
常见问题与解决方案
1. 会话过期问题
# 设置会话不自动过期对象
Session = sessionmaker(bind=engine, expire_on_commit=False)
2. 懒加载异常
# 在会话外访问关系属性时可能出现的问题
with Session() as session:
user = session.execute(select(User).where(User.id == 1)).scalar_one()
# 会话已关闭,访问关系属性会抛出异常
# 解决方法:使用急加载或在会话内完成所有操作
with Session() as session:
user = session.execute(
select(User).options(joinedload(User.posts)).where(User.id == 1)
).scalar_one()
# 在会话内处理数据或将需要的数据提取出来
posts_data = [(post.id, post.title) for post in user.posts]
# 会话外可以安全访问 posts_data
3. 连接池配置
# 配置连接池以处理长时间运行的应用
engine = create_engine(
url_object,
pool_size=10, # 连接池大小
max_overflow=20, # 超出 pool_size 后的最大连接数
pool_timeout=30, # 获取连接的超时时间
pool_recycle=1800, # 连接回收时间(秒)
pool_pre_ping=True # 使用前检查连接是否有效
)
评论区