目 录CONTENT

文章目录

SQLAlchemy 2.x 学习笔记

~梓
2025-07-04 / 0 评论 / 0 点赞 / 3 阅读 / 0 字
温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

SQLAlchemy 2.x 学习笔记

简介

SQLAlchemy 是 Python 中最流行的 ORM(对象关系映射)库,它提供了高效、灵活的数据库操作方式。本文基于 SQLAlchemy 2.x 版本,结合 MySQL 数据库,记录学习过程中的关键知识点和实用技巧。

安装配置

首先安装 SQLAlchemy 和 MySQL 驱动:

pip install sqlalchemy==2.0.23
pip install mysqlclient

为什么需要单独安装MySQL驱动?

SQLAlchemy 本身是一个数据库抽象层和ORM工具,它不包含与具体数据库系统通信的驱动程序。这种设计遵循了关注点分离的原则,带来以下优势:

  1. 灵活性:用户可以根据需求选择不同的数据库驱动,如 mysqlclientPyMySQLmysql-connector-python
  2. 维护效率:SQLAlchemy团队可以专注于ORM功能开发,而不必维护所有数据库的连接驱动
  3. 版本独立性:数据库驱动可以独立升级,不受SQLAlchemy版本限制
  4. 体积优化:避免安装不需要的驱动,减小项目依赖体积

对于MySQL,常用的驱动选择有:

  • mysqlclient:C扩展驱动,性能最佳,推荐生产环境使用
  • PyMySQL:纯Python实现,安装简单但性能较低
  • mysql-connector-python:Oracle官方驱动,纯Python实现

在连接字符串中,驱动名称通过 dialect+driver格式指定,如 mysql+mysqlclientmysql+pymysql等。

MySQL驱动性能比较

不同驱动在性能上存在显著差异,以下是一个简要比较:

驱动 类型 性能 安装难度 适用场景
mysqlclient C扩展 最佳 中等(需要C编译器) 生产环境,高性能要求
PyMySQL 纯Python 中等 简单 开发环境,简单应用
mysql-connector-python 纯Python 较低 简单 需要官方支持的场景

常见安装问题

mysqlclient安装问题

在Windows上安装mysqlclient可能遇到编译错误,解决方法:

# 使用预编译的二进制wheel包
pip install mysqlclient --only-binary :all:

或者在Linux上:

# Ubuntu/Debian
sudo apt-get install python3-dev default-libmysqlclient-dev build-essential

# CentOS/RHEL
sudo yum install python3-devel mysql-devel gcc

数据库连接

SQLAlchemy 2.x 使用新的连接方式,推荐使用 URL 对象创建引擎:

from sqlalchemy import create_engine, URL
from sqlalchemy.orm import sessionmaker

# 创建连接 URL(使用mysqlclient驱动)
url_object = URL.create(
    "mysql+mysqldb",  # 指定数据库类型和驱动名称
    username="user",
    password="password",
    host="localhost",
    port=3306,
    database="testdb",
)

# 创建引擎
engine = create_engine(url_object, echo=True)

# 创建会话工厂
Session = sessionmaker(bind=engine)

也可以使用连接字符串:

# 使用mysqlclient驱动(推荐)
engine = create_engine("mysql+mysqldb://user:password@localhost:3306/testdb", echo=True)

# 使用PyMySQL驱动
# engine = create_engine("mysql+pymysql://user:password@localhost:3306/testdb", echo=True)

# 使用mysql-connector-python驱动
# engine = create_engine("mysql+mysqlconnector://user:password@localhost:3306/testdb", echo=True)

参数说明:

  • echo=True:输出 SQL 语句,方便调试
  • pool_size:连接池大小
  • pool_timeout:连接超时时间
  • pool_recycle:连接回收时间

驱动特定配置选项

不同的MySQL驱动支持特定的连接参数,可以通过 connect_args参数传递:

# mysqlclient特定选项
engine = create_engine(
    "mysql+mysqldb://user:password@localhost/testdb",
    connect_args={
        "ssl": {
            "ca": "/path/to/ca-cert.pem",
            "cert": "/path/to/client-cert.pem",
            "key": "/path/to/client-key.pem"
        },
        "charset": "utf8mb4",
        "use_unicode": True,
        "autocommit": False
    }
)

# PyMySQL特定选项
engine = create_engine(
    "mysql+pymysql://user:password@localhost/testdb",
    connect_args={
        "charset": "utf8mb4",
        "read_timeout": 10,  # 秒
        "write_timeout": 10,  # 秒
        "ssl_ca": "/path/to/ca-cert.pem"
    }
)

声明模型

SQLAlchemy 2.x 推荐使用声明式映射和类型注解:

from typing import List, Optional
from sqlalchemy import String, Integer, ForeignKey, Text, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

# 定义基类
class Base(DeclarativeBase):
    pass

# 用户模型
class User(Base):
    __tablename__ = "users"
  
    id: Mapped[int] = mapped_column(primary_key=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
    email: Mapped[str] = mapped_column(String(100), unique=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
  
    # 关系:一个用户可以有多篇文章
    posts: Mapped[List["Post"]] = relationship(back_populates="author")
  
    def __repr__(self) -> str:
        return f"<User(username={self.username}, email={self.email})>"

# 文章模型
class Post(Base):
    __tablename__ = "posts"
  
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False)
    content: Mapped[str] = mapped_column(Text)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    created_at: Mapped[datetime] = mapped_column(DateTime, default=func.now())
  
    # 关系:多篇文章属于一个用户
    author: Mapped["User"] = relationship(back_populates="posts")
  
    def __repr__(self) -> str:
        return f"<Post(title={self.title})>"

创建表

# 创建所有表
Base.metadata.create_all(engine)

# 删除所有表
# Base.metadata.drop_all(engine)

会话管理

SQLAlchemy 2.x 推荐使用上下文管理器处理会话:

# 创建会话
with Session() as session:
    # 数据库操作
    user = User(username="test_user", email="test@example.com")
    session.add(user)
    session.commit()

基本操作

插入数据

with Session() as session:
    # 创建用户
    user1 = User(username="alice", email="alice@example.com")
    user2 = User(username="bob", email="bob@example.com")
  
    # 添加到会话
    session.add_all([user1, user2])
  
    # 提交事务
    session.commit()
  
    # 创建文章并关联用户
    post1 = Post(
        title="SQLAlchemy 入门",
        content="这是一篇关于 SQLAlchemy 的入门文章",
        user_id=user1.id
    )
  
    post2 = Post(
        title="Python Web 开发",
        content="使用 Python 进行 Web 开发的技巧",
        user_id=user1.id
    )
  
    session.add_all([post1, post2])
    session.commit()

查询数据

SQLAlchemy 2.x 引入了新的查询 API,使用 select 语句:

from sqlalchemy import select

with Session() as session:
    # 查询所有用户
    stmt = select(User)
    users = session.execute(stmt).scalars().all()
    for user in users:
        print(user)
  
    # 条件查询
    stmt = select(User).where(User.username == "alice")
    user = session.execute(stmt).scalar_one_or_none()
    if user:
        print(f"找到用户: {user.username}")
  
    # 排序
    stmt = select(Post).order_by(Post.created_at.desc())
    posts = session.execute(stmt).scalars().all()
    for post in posts:
        print(f"{post.title} - {post.created_at}")
  
    # 限制结果数量
    stmt = select(Post).limit(5).offset(0)
    posts = session.execute(stmt).scalars().all()
  
    # 连接查询
    stmt = select(User, Post).join(Post, User.id == Post.user_id)
    results = session.execute(stmt).all()
    for user, post in results:
        print(f"{user.username}: {post.title}")

更新数据

with Session() as session:
    # 方法一:查询后修改
    user = session.execute(select(User).where(User.username == "alice")).scalar_one()
    user.email = "alice_new@example.com"
    session.commit()
  
    # 方法二:直接执行更新
    from sqlalchemy import update
    stmt = update(User).where(User.username == "bob").values(email="bob_new@example.com")
    session.execute(stmt)
    session.commit()

删除数据

with Session() as session:
    # 方法一:查询后删除
    post = session.execute(select(Post).where(Post.id == 1)).scalar_one()
    session.delete(post)
    session.commit()
  
    # 方法二:直接执行删除
    from sqlalchemy import delete
    stmt = delete(Post).where(Post.id == 2)
    session.execute(stmt)
    session.commit()

高级查询

聚合函数

from sqlalchemy import func, desc

with Session() as session:
    # 计数
    stmt = select(func.count()).select_from(User)
    count = session.execute(stmt).scalar()
    print(f"用户总数: {count}")
  
    # 分组统计
    stmt = select(User.id, func.count(Post.id).label("post_count")).\
           join(Post).\
           group_by(User.id).\
           order_by(desc("post_count"))
    results = session.execute(stmt).all()
    for user_id, post_count in results:
        print(f"用户 ID {user_id} 发表了 {post_count} 篇文章")

子查询

with Session() as session:
    # 查找发表文章数量最多的用户
    subq = select(Post.user_id, func.count(Post.id).label("post_count")).\
           group_by(Post.user_id).subquery()
  
    stmt = select(User, subq.c.post_count).\
           join(subq, User.id == subq.c.user_id).\
           order_by(desc("post_count"))
  
    results = session.execute(stmt).all()
    for user, post_count in results:
        print(f"{user.username} 发表了 {post_count} 篇文章")

关系和加载策略

关系类型

  1. 一对多关系(如上面的 User 和 Post)
  2. 多对一关系(Post 到 User 的反向关系)
  3. 多对多关系:
# 定义中间表
post_tag = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", ForeignKey("posts.id"), primary_key=True),
    Column("tag_id", ForeignKey("tags.id"), primary_key=True)
)

class Tag(Base):
    __tablename__ = "tags"
  
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
  
    # 多对多关系
    posts: Mapped[List["Post"]] = relationship(secondary=post_tag, back_populates="tags")

# 在 Post 类中添加
class Post(Base):
    # ... 其他属性 ...
    tags: Mapped[List["Tag"]] = relationship(secondary=post_tag, back_populates="posts")

加载策略

from sqlalchemy.orm import joinedload, selectinload, subqueryload

with Session() as session:
    # 懒加载(默认)
    user = session.execute(select(User).where(User.id == 1)).scalar_one()
    # 访问时才加载关联数据
    for post in user.posts:
        print(post.title)
  
    # 急加载 - Joined Load
    stmt = select(User).options(joinedload(User.posts)).where(User.id == 1)
    user = session.execute(stmt).unique().scalar_one()
    # 已预加载关联数据
    for post in user.posts:
        print(post.title)
  
    # SelectIn Load(适合集合)
    stmt = select(User).options(selectinload(User.posts))
    users = session.execute(stmt).unique().scalars().all()
  
    # Subquery Load
    stmt = select(User).options(subqueryload(User.posts))
    users = session.execute(stmt).unique().scalars().all()

事务管理

from sqlalchemy.exc import IntegrityError

# 显式事务
with Session() as session:
    try:
        # 开始事务
        user = User(username="transaction_test", email="test@example.com")
        session.add(user)
    
        post = Post(title="事务测试", content="测试内容", user_id=user.id)
        session.add(post)
    
        # 提交事务
        session.commit()
    except IntegrityError:
        # 回滚事务
        session.rollback()
        print("事务失败,已回滚")

原生 SQL

with Session() as session:
    # 执行原生 SQL 查询
    result = session.execute("SELECT * FROM users WHERE username LIKE :pattern", 
                            {"pattern": "%alice%"})
    for row in result:
        print(row)
  
    # 使用 text 构造 SQL
    from sqlalchemy import text
    stmt = text("SELECT * FROM posts WHERE title LIKE :title")
    result = session.execute(stmt, {"title": "%SQLAlchemy%"})
    for row in result:
        print(row)

性能优化技巧

  1. 合理使用加载策略:根据查询需求选择合适的加载策略,避免 N+1 查询问题。
  2. 批量操作:使用 bulk_insert_mappingsbulk_update_mappingsbulk_save_objects 进行批量操作。
with Session() as session:
    # 批量插入
    users_data = [
        {"username": f"user_{i}", "email": f"user_{i}@example.com"}
        for i in range(100)
    ]
    session.bulk_insert_mappings(User, users_data)
    session.commit()
  1. 使用索引:在数据库层面创建适当的索引。
from sqlalchemy import Index

# 在模型中定义索引
class User(Base):
    __tablename__ = "users"
    # ... 属性定义 ...
  
    # 定义索引
    __table_args__ = (
        Index("idx_username", "username"),
        Index("idx_email", "email"),
    )
  1. 使用 yield_per:处理大量数据时分批获取结果。
with Session() as session:
    # 分批处理大量数据
    stmt = select(User)
    for user in session.scalars(stmt).yield_per(100):
        # 每次处理 100 条记录
        print(user.username)

常见问题与解决方案

1. 会话过期问题

# 设置会话不自动过期对象
Session = sessionmaker(bind=engine, expire_on_commit=False)

2. 懒加载异常

# 在会话外访问关系属性时可能出现的问题
with Session() as session:
    user = session.execute(select(User).where(User.id == 1)).scalar_one()

# 会话已关闭,访问关系属性会抛出异常
# 解决方法:使用急加载或在会话内完成所有操作
with Session() as session:
    user = session.execute(
        select(User).options(joinedload(User.posts)).where(User.id == 1)
    ).scalar_one()
    # 在会话内处理数据或将需要的数据提取出来
    posts_data = [(post.id, post.title) for post in user.posts]

# 会话外可以安全访问 posts_data

3. 连接池配置

# 配置连接池以处理长时间运行的应用
engine = create_engine(
    url_object,
    pool_size=10,           # 连接池大小
    max_overflow=20,        # 超出 pool_size 后的最大连接数
    pool_timeout=30,        # 获取连接的超时时间
    pool_recycle=1800,      # 连接回收时间(秒)
    pool_pre_ping=True      # 使用前检查连接是否有效
)
0

评论区