网站首页 > 技术文章 正文
今日目标
o 掌握SQLAlchemy ORM的高级特性
o 学会复杂查询和关系操作
o 了解数据库性能优化技术
o 掌握事务管理和连接池
o 学会数据库迁移和版本控制
SQLAlchemy ORM进阶
1. 复杂关系模型
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Text, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.sql import func
from datetime import datetime
Base = declarative_base()
class User(Base):
"""用户模型"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(120), unique=True, nullable=False)
password_hash = Column(String(128), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
# 关系定义
posts = relationship('Post', back_populates='author', cascade='all, delete-orphan')
comments = relationship('Comment', back_populates='author', cascade='all, delete-orphan')
profile = relationship('UserProfile', back_populates='user', uselist=False)
def __repr__(self):
return f'<User {self.username}>'
class UserProfile(Base):
"""用户资料模型"""
__tablename__ = 'user_profiles'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id'), unique=True)
first_name = Column(String(50))
last_name = Column(String(50))
bio = Column(Text)
avatar_url = Column(String(200))
birth_date = Column(DateTime)
# 关系定义
user = relationship('User', back_populates='profile')
def __repr__(self):
return f'<UserProfile {self.user.username}>'
class Category(Base):
"""分类模型"""
__tablename__ = 'categories'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False, unique=True)
slug = Column(String(100), unique=True, nullable=False)
description = Column(Text)
parent_id = Column(Integer, ForeignKey('categories.id'))
# 自引用关系
parent = relationship('Category', remote_side=[id])
children = relationship('Category', back_populates='parent')
posts = relationship('Post', back_populates='category')
def __repr__(self):
return f'<Category {self.name}>'
class Post(Base):
"""文章模型"""
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
title = Column(String(200), nullable=False)
slug = Column(String(200), unique=True, nullable=False)
content = Column(Text, nullable=False)
excerpt = Column(Text)
author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
category_id = Column(Integer, ForeignKey('categories.id'))
status = Column(String(20), default='draft') # draft, published, archived
view_count = Column(Integer, default=0)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
published_at = Column(DateTime)
# 关系定义
author = relationship('User', back_populates='posts')
category = relationship('Category', back_populates='posts')
comments = relationship('Comment', back_populates='post', cascade='all, delete-orphan')
tags = relationship('Tag', secondary='post_tags', back_populates='posts')
def __repr__(self):
return f'<Post {self.title}>'
class Tag(Base):
"""标签模型"""
__tablename__ = 'tags'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
slug = Column(String(50), unique=True, nullable=False)
# 多对多关系
posts = relationship('Post', secondary='post_tags', back_populates='tags')
def __repr__(self):
return f'<Tag {self.name}>'
class PostTag(Base):
"""文章标签关联表"""
__tablename__ = 'post_tags'
post_id = Column(Integer, ForeignKey('posts.id'), primary_key=True)
tag_id = Column(Integer, ForeignKey('tags.id'), primary_key=True)
class Comment(Base):
"""评论模型"""
__tablename__ = 'comments'
id = Column(Integer, primary_key=True)
content = Column(Text, nullable=False)
author_id = Column(Integer, ForeignKey('users.id'), nullable=False)
post_id = Column(Integer, ForeignKey('posts.id'), nullable=False)
parent_id = Column(Integer, ForeignKey('comments.id'))
is_approved = Column(Boolean, default=False)
created_at = Column(DateTime, default=func.now())
# 关系定义
author = relationship('User', back_populates='comments')
post = relationship('Post', back_populates='comments')
parent = relationship('Comment', remote_side=[id])
replies = relationship('Comment', back_populates='parent')
def __repr__(self):
return f'<Comment {self.id}>'
2. 数据库连接和会话管理
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.pool import QueuePool
import os
# 数据库配置
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///blog.db')
# 创建引擎
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600,
echo=True # 开发时显示SQL语句
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建线程安全的会话
Session = scoped_session(SessionLocal)
def get_db():
"""获取数据库会话"""
db = SessionLocal()
try:
yield db
finally:
db.close()
# 创建所有表
def create_tables():
Base.metadata.create_all(bind=engine)
# 删除所有表
def drop_tables():
Base.metadata.drop_all(bind=engine)
高级查询操作
1. 复杂查询
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, not_, func, desc, asc
from datetime import datetime, timedelta
def advanced_queries(db: Session):
"""高级查询示例"""
# 1. 条件查询
# 查找已发布的文章
published_posts = db.query(Post).filter(Post.status == 'published').all()
# 查找特定作者的文章
author_posts = db.query(Post).filter(Post.author_id == 1).all()
# 2. 复合条件查询
# 查找已发布且阅读量大于100的文章
popular_posts = db.query(Post).filter(
and_(
Post.status == 'published',
Post.view_count > 100
)
).all()
# 查找标题包含关键词或内容包含关键词的文章
keyword_posts = db.query(Post).filter(
or_(
Post.title.contains('Python'),
Post.content.contains('Python')
)
).all()
# 3. 排序和限制
# 按发布时间倒序排列,限制10条
recent_posts = db.query(Post).filter(
Post.status == 'published'
).order_by(desc(Post.published_at)).limit(10).all()
# 4. 聚合查询
# 统计每个用户的文章数量
user_post_counts = db.query(
User.username,
func.count(Post.id).label('post_count')
).join(Post).group_by(User.id, User.username).all()
# 统计每个分类的文章数量
category_post_counts = db.query(
Category.name,
func.count(Post.id).label('post_count')
).join(Post).group_by(Category.id, Category.name).all()
# 5. 子查询
# 查找评论数量最多的文章
from sqlalchemy import select
subquery = select(func.count(Comment.id)).where(Comment.post_id == Post.id).scalar_subquery()
posts_with_comment_count = db.query(
Post.title,
subquery.label('comment_count')
).order_by(desc('comment_count')).all()
# 6. 窗口函数
# 为每个分类的文章按发布时间排序并添加行号
from sqlalchemy import over
posts_with_rank = db.query(
Post.title,
Category.name.label('category_name'),
Post.published_at,
func.row_number().over(
partition_by=Post.category_id,
order_by=desc(Post.published_at)
).label('rank')
).join(Category).filter(Post.status == 'published').all()
return {
'published_posts': published_posts,
'popular_posts': popular_posts,
'user_post_counts': user_post_counts,
'category_post_counts': category_post_counts,
'posts_with_comment_count': posts_with_comment_count,
'posts_with_rank': posts_with_rank
}
2. 关系查询
def relationship_queries(db: Session):
"""关系查询示例"""
# 1. 预加载关系
# 使用 joinedload 预加载作者信息
from sqlalchemy.orm import joinedload, selectinload
posts_with_author = db.query(Post).options(
joinedload(Post.author)
).filter(Post.status == 'published').all()
# 使用 selectinload 预加载评论
posts_with_comments = db.query(Post).options(
selectinload(Post.comments)
).filter(Post.status == 'published').all()
# 2. 多级关系查询
# 查找所有评论及其作者和文章信息
comments_with_details = db.query(Comment).options(
joinedload(Comment.author),
joinedload(Comment.post)
).all()
# 3. 反向关系查询
# 查找用户的所有文章和评论
user_with_content = db.query(User).options(
selectinload(User.posts),
selectinload(User.comments)
).filter(User.username == 'admin').first()
# 4. 多对多关系查询
# 查找包含特定标签的文章
python_posts = db.query(Post).join(Post.tags).filter(Tag.name == 'Python').all()
# 查找文章的所有标签
post_with_tags = db.query(Post).options(
selectinload(Post.tags)
).filter(Post.id == 1).first()
# 5. 自引用关系查询
# 查找分类及其子分类
categories_with_children = db.query(Category).options(
selectinload(Category.children)
).filter(Category.parent_id.is_(None)).all()
# 查找评论及其回复
comments_with_replies = db.query(Comment).options(
selectinload(Comment.replies)
).filter(Comment.parent_id.is_(None)).all()
return {
'posts_with_author': posts_with_author,
'posts_with_comments': posts_with_comments,
'comments_with_details': comments_with_details,
'user_with_content': user_with_content,
'python_posts': python_posts,
'post_with_tags': post_with_tags,
'categories_with_children': categories_with_children,
'comments_with_replies': comments_with_replies
}
3. 批量操作
def bulk_operations(db: Session):
"""批量操作示例"""
# 1. 批量插入
# 创建多个用户
new_users = [
User(username=f'user{i}', email=f'user{i}@example.com', password_hash='hash')
for i in range(1, 6)
]
db.add_all(new_users)
db.commit()
# 2. 批量更新
# 将所有草稿状态的文章标记为已发布
db.query(Post).filter(Post.status == 'draft').update({
Post.status: 'published',
Post.published_at: func.now()
})
db.commit()
# 3. 批量删除
# 删除所有未批准的评论
deleted_count = db.query(Comment).filter(Comment.is_approved == False).delete()
db.commit()
# 4. 使用 bulk_insert_mappings
from sqlalchemy.orm import bulk_insert_mappings
user_data = [
{'username': 'bulk_user1', 'email': 'bulk1@example.com', 'password_hash': 'hash'},
{'username': 'bulk_user2', 'email': 'bulk2@example.com', 'password_hash': 'hash'},
{'username': 'bulk_user3', 'email': 'bulk3@example.com', 'password_hash': 'hash'},
]
db.bulk_insert_mappings(User, user_data)
db.commit()
# 5. 使用 bulk_update_mappings
from sqlalchemy.orm import bulk_update_mappings
# 获取需要更新的用户
users_to_update = db.query(User).filter(User.username.like('bulk_user%')).all()
update_data = [
{'id': user.id, 'is_active': False}
for user in users_to_update
]
db.bulk_update_mappings(User, update_data)
db.commit()
return {
'deleted_comments': deleted_count,
'updated_users': len(update_data)
}
事务管理
1. 基本事务操作
from sqlalchemy.exc import SQLAlchemyError
from contextlib import contextmanager
@contextmanager
def transaction_scope(db: Session):
"""事务上下文管理器"""
try:
yield db
db.commit()
except Exception as e:
db.rollback()
raise e
def transaction_examples(db: Session):
"""事务操作示例"""
# 1. 基本事务
try:
# 创建新用户
new_user = User(
username='transaction_user',
email='transaction@example.com',
password_hash='hash'
)
db.add(new_user)
# 创建用户资料
user_profile = UserProfile(
user=new_user,
first_name='Transaction',
last_name='User',
bio='Created in transaction'
)
db.add(user_profile)
# 提交事务
db.commit()
print("事务提交成功")
except SQLAlchemyError as e:
# 回滚事务
db.rollback()
print(f"事务回滚: {e}")
# 2. 使用上下文管理器
with transaction_scope(db):
# 创建文章
post = Post(
title='Transaction Test Post',
slug='transaction-test-post',
content='This post was created in a transaction',
author_id=1,
status='published'
)
db.add(post)
# 创建标签
tag = Tag(name='Transaction', slug='transaction')
db.add(tag)
# 关联文章和标签
post.tags.append(tag)
# 3. 嵌套事务
try:
# 外层事务
user = User(username='nested_user', email='nested@example.com', password_hash='hash')
db.add(user)
db.flush() # 获取用户ID但不提交
try:
# 内层事务
profile = UserProfile(user_id=user.id, first_name='Nested', last_name='User')
db.add(profile)
db.commit() # 提交内层事务
print("嵌套事务成功")
except SQLAlchemyError as e:
db.rollback()
print(f"内层事务回滚: {e}")
except SQLAlchemyError as e:
db.rollback()
print(f"外层事务回滚: {e}")
2. 事务隔离级别
from sqlalchemy import text
def isolation_level_examples(db: Session):
"""事务隔离级别示例"""
# 1. 设置隔离级别
# 读未提交
db.execute(text("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"))
# 读已提交
db.execute(text("SET TRANSACTION ISOLATION LEVEL READ COMMITTED"))
# 可重复读
db.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))
# 串行化
db.execute(text("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"))
# 2. 脏读示例(需要两个会话)
# 会话1:更新数据但不提交
session1 = SessionLocal()
user = session1.query(User).filter(User.id == 1).first()
user.username = 'dirty_read_test'
session1.flush() # 不提交
# 会话2:读取未提交的数据
session2 = SessionLocal()
user2 = session2.query(User).filter(User.id == 1).first()
print(f"脏读结果: {user2.username}")
session1.rollback()
session1.close()
session2.close()
# 3. 不可重复读示例
# 会话1:读取数据
session1 = SessionLocal()
user1 = session1.query(User).filter(User.id == 1).first()
print(f"第一次读取: {user1.username}")
# 会话2:更新数据并提交
session2 = SessionLocal()
user2 = session2.query(User).filter(User.id == 1).first()
user2.username = 'updated_username'
session2.commit()
session2.close()
# 会话1:再次读取数据
user1_again = session1.query(User).filter(User.id == 1).first()
print(f"第二次读取: {user1_again.username}")
session1.close()
性能优化
1. 查询优化
def query_optimization(db: Session):
"""查询优化示例"""
# 1. 使用索引
# 在模型中定义索引
from sqlalchemy import Index
# 为常用查询字段创建索引
Index('idx_posts_status_published_at', Post.status, Post.published_at)
Index('idx_posts_author_status', Post.author_id, Post.status)
Index('idx_comments_post_approved', Comment.post_id, Comment.is_approved)
# 2. 避免N+1查询问题
# 错误做法:N+1查询
posts = db.query(Post).filter(Post.status == 'published').all()
for post in posts:
print(f"文章: {post.title}, 作者: {post.author.username}") # 每次都会查询作者
# 正确做法:预加载
posts = db.query(Post).options(
joinedload(Post.author)
).filter(Post.status == 'published').all()
for post in posts:
print(f"文章: {post.title}, 作者: {post.author.username}") # 一次查询获取所有数据
# 3. 使用子查询优化
# 查找评论数量最多的前10篇文章
from sqlalchemy import select
comment_count_subquery = select([
Comment.post_id,
func.count(Comment.id).label('comment_count')
]).group_by(Comment.post_id).subquery()
top_posts = db.query(
Post.title,
comment_count_subquery.c.comment_count
).join(
comment_count_subquery,
Post.id == comment_count_subquery.c.post_id
).order_by(
desc(comment_count_subquery.c.comment_count)
).limit(10).all()
# 4. 分页优化
# 使用游标分页而不是偏移分页
def cursor_pagination(db: Session, last_id: int = None, limit: int = 10):
query = db.query(Post).filter(Post.status == 'published')
if last_id:
query = query.filter(Post.id > last_id)
return query.order_by(Post.id).limit(limit).all()
# 5. 使用原生SQL优化复杂查询
def complex_query_with_sql(db: Session):
sql = """
SELECT
p.title,
u.username as author,
c.name as category,
COUNT(cm.id) as comment_count,
AVG(cm.id) as avg_comment_id
FROM posts p
JOIN users u ON p.author_id = u.id
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN comments cm ON p.id = cm.post_id AND cm.is_approved = 1
WHERE p.status = 'published'
GROUP BY p.id, p.title, u.username, c.name
HAVING COUNT(cm.id) > 0
ORDER BY comment_count DESC
LIMIT 10
"""
result = db.execute(text(sql))
return [dict(row) for row in result]
return {
'top_posts': top_posts,
'complex_query_result': complex_query_with_sql(db)
}
2. 连接池优化
from sqlalchemy.pool import QueuePool, StaticPool
from sqlalchemy import event
def connection_pool_optimization():
"""连接池优化示例"""
# 1. 配置连接池参数
engine = create_engine(
'sqlite:///optimized.db',
poolclass=QueuePool,
pool_size=20, # 连接池大小
max_overflow=30, # 最大溢出连接数
pool_pre_ping=True, # 连接前ping测试
pool_recycle=3600, # 连接回收时间(秒)
pool_timeout=30, # 获取连接超时时间
echo=False # 生产环境关闭SQL日志
)
# 2. 连接池事件监听
@event.listens_for(engine, 'checkout')
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
print("连接被借出")
@event.listens_for(engine, 'checkin')
def receive_checkin(dbapi_connection, connection_record):
print("连接被归还")
# 3. 连接池统计
def get_pool_stats(engine):
pool = engine.pool
return {
'size': pool.size(),
'checked_in': pool.checkedin(),
'checked_out': pool.checkedout(),
'overflow': pool.overflow(),
'invalid': pool.invalid()
}
# 4. 使用连接池的最佳实践
def best_practices():
# 使用上下文管理器确保连接正确归还
with engine.connect() as connection:
result = connection.execute(text("SELECT 1"))
return result.fetchone()
return {
'pool_stats': get_pool_stats(engine),
'test_result': best_practices()
}
数据库迁移
1. Alembic迁移
# alembic.ini 配置
"""
[alembic]
script_location = alembic
sqlalchemy.url = sqlite:///blog.db
"""
# env.py 配置
"""
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
import os
import sys
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from models import Base
config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
"""
# 创建迁移脚本
def create_migration():
"""创建迁移脚本示例"""
# 1. 初始化Alembic
# alembic init alembic
# 2. 创建初始迁移
# alembic revision --autogenerate -m "Initial migration"
# 3. 应用迁移
# alembic upgrade head
# 4. 创建新的迁移
# alembic revision --autogenerate -m "Add user profile table"
# 5. 回滚迁移
# alembic downgrade -1
pass
# 迁移脚本示例
"""
def upgrade():
# 创建用户资料表
op.create_table('user_profiles',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('first_name', sa.String(length=50), nullable=True),
sa.Column('last_name', sa.String(length=50), nullable=True),
sa.Column('bio', sa.Text(), nullable=True),
sa.Column('avatar_url', sa.String(length=200), nullable=True),
sa.Column('birth_date', sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
# 添加索引
op.create_index(op.f('ix_user_profiles_user_id'), 'user_profiles', ['user_id'], unique=True)
def downgrade():
op.drop_index(op.f('ix_user_profiles_user_id'), table_name='user_profiles')
op.drop_table('user_profiles')
"""
今日总结
今天我们学习了SQLAlchemy ORM的高级特性:
1. 复杂关系模型:一对多、多对多、自引用关系
2. 高级查询操作:复杂条件、聚合查询、子查询、窗口函数
3. 关系查询优化:预加载、延迟加载、批量加载
4. 事务管理:事务控制、隔离级别、嵌套事务
5. 性能优化:查询优化、连接池、索引使用
6. 数据库迁移:Alembic工具、版本控制
掌握这些高级特性可以构建高性能、可维护的数据库应用。
练习建议
1. 设计一个复杂的电商数据库模型
2. 实现高性能的分页查询
3. 优化现有应用的数据库查询
4. 创建完整的数据库迁移流程
猜你喜欢
- 2025-08-06 生产环境中使用的十大 Python 设计模式
- 2025-08-06 面试必备:Python内存管理机制(建议收藏)
- 2025-08-06 服务端开发面试必背——消息队列及它的主要用途和优点。附代码
- 2025-08-06 Python 栈:深度解析与应用
- 2025-08-06 Python中的多进程
- 2025-08-06 Python Logging 最佳实践
- 2025-08-06 Python并发数据结构实现原理
- 2025-08-06 用SendGrid和Redis队列用Python调度国际空间站的电子邮件
- 2025-08-06 Python倒车请注意!负步长range的10个高能用法,让代码效率翻倍
- 2025-08-06 python collections 的超赞功能
- 08-06生产环境中使用的十大 Python 设计模式
- 08-06面试必备:Python内存管理机制(建议收藏)
- 08-06服务端开发面试必背——消息队列及它的主要用途和优点。附代码
- 08-06Python 栈:深度解析与应用
- 08-06Python中的多进程
- 08-06Python Logging 最佳实践
- 08-06Python并发数据结构实现原理
- 08-06用SendGrid和Redis队列用Python调度国际空间站的电子邮件
- 最近发表
- 标签列表
-
- python中类 (31)
- python 迭代 (34)
- python 小写 (35)
- python怎么输出 (33)
- python 日志 (35)
- python语音 (31)
- python 工程师 (34)
- python3 安装 (31)
- python音乐 (31)
- 安卓 python (32)
- python 小游戏 (32)
- python 安卓 (31)
- python聚类 (34)
- python向量 (31)
- python大全 (31)
- python次方 (33)
- python桌面 (32)
- python总结 (34)
- python浏览器 (32)
- python 请求 (32)
- python 前端 (32)
- python验证码 (33)
- python 题目 (32)
- python 文件写 (33)
- python中的用法 (32)