数据库操作
大约 12 分钟约 3536 字
数据库操作
简介
Python 通过标准库和第三方包支持多种数据库操作。本篇介绍使用 SQLAlchemy ORM、sqlite3 标准库和常见数据库的连接与操作方法。
Python 数据库访问的核心挑战在于"连接管理"和"数据映射"两个问题。连接管理涉及连接池、超时、重连和事务隔离等底层细节;数据映射则涉及将数据库的行记录转换为 Python 对象(ORM)或字典(Row factory)。SQLAlchemy 通过分层架构解决了这些问题:Engine 层管理连接池,Connection 层执行 SQL,ORM 层提供对象关系映射。
特点
sqlite3 标准库
基本操作
import sqlite3
# 连接数据库(自动创建)
conn = sqlite3.connect('app.db')
conn.row_factory = sqlite3.Row # 返回字典风格结果
cursor = conn.cursor()
# 建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 插入数据
cursor.execute('INSERT INTO users (name, email, age) VALUES (?, ?, ?)',
('张三', 'zhang@example.com', 30))
# 批量插入
users = [
('李四', 'li@example.com', 25),
('王五', 'wang@example.com', 35),
('赵六', 'zhao@example.com', 28),
]
cursor.executemany('INSERT INTO users (name, email, age) VALUES (?, ?, ?)', users)
conn.commit()
# 查询
cursor.execute('SELECT * FROM users WHERE age > ?', (25,))
for row in cursor.fetchall():
print(dict(row))
# 更新
cursor.execute('UPDATE users SET age = ? WHERE name = ?', (31, '张三'))
conn.commit()
# 删除
cursor.execute('DELETE FROM users WHERE id = ?', (1,))
conn.commit()
conn.close()上下文管理器与安全操作
from contextlib import contextmanager
import sqlite3
import logging
logger = logging.getLogger(__name__)
@contextmanager
def get_db_connection(db_path='app.db'):
"""数据库连接上下文管理器"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging
conn.execute("PRAGMA foreign_keys=ON") # 启用外键约束
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
logger.error(f"数据库操作失败,已回滚: {e}")
raise
finally:
conn.close()
# 使用
with get_db_connection() as conn:
result = conn.execute('SELECT * FROM users').fetchall()
users = [dict(row) for row in result]
# SQL 注入防护 —— 永远使用参数化查询
# 危险:不要使用字符串拼接
# cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
# 安全:使用占位符
with get_db_connection() as conn:
user = conn.execute(
"SELECT * FROM users WHERE name = ? AND age > ?",
(user_input, min_age)
).fetchone()
# 执行计划分析
with get_db_connection() as conn:
plan = conn.execute("EXPLAIN QUERY PLAN SELECT * FROM users WHERE email = ?", ("test@",)).fetchall()
for row in plan:
print(dict(row))sqlite3 事务与并发
import sqlite3
import threading
# SQLite 并发注意事项:
# 1. SQLite 使用文件级锁,写操作会锁定整个数据库
# 2. WAL 模式允许同时读和写,但写仍然是串行的
# 3. 高并发写入场景应使用 PostgreSQL 或 MySQL
# 隔离级别
conn = sqlite3.connect("app.db", isolation_level="DEFERRED") # 默认
# DEFERRED —— 不立即获取锁,首次写操作时获取
# IMMEDIATE —— 立即获取保留锁
# EXCLUSIVE —— 立即获取独占锁
# 批量插入优化
def bulk_insert_fast(items: list[tuple]):
"""快速批量插入"""
conn = sqlite3.connect("app.db")
cursor = conn.cursor()
try:
# 关闭自动提交,使用显式事务
cursor.execute("BEGIN TRANSACTION")
cursor.executemany(
"INSERT INTO users (name, email, age) VALUES (?, ?, ?)",
items
)
conn.commit()
except Exception as e:
conn.rollback()
raise
finally:
conn.close()
# 性能对比:事务内批量 vs 逐条提交
# 逐条提交 10000 条:约 10-20 秒
# 事务内批量 10000 条:约 0.1-0.5 秒
# 差距 20-200 倍!SQLAlchemy ORM
模型定义
# pip install sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String, DateTime, ForeignKey, Boolean, Text
from sqlalchemy.orm import declarative_base, relationship, sessionmaker, Mapped, mapped_column
from datetime import datetime
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(50), nullable=False)
email: Mapped[str] = mapped_column(String(120), unique=True, nullable=False)
age: Mapped[int | None] = mapped_column(Integer)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
# 关系
posts = relationship('Post', back_populates='author')
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'email': self.email,
'age': self.age,
'is_active': self.is_active
}
class Post(Base):
__tablename__ = 'posts'
id: Mapped[int] = mapped_column(Integer, primary_key=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
content: Mapped[str | None] = mapped_column(Text)
user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.now)
author = relationship('User', back_populates='posts')
# 初始化
engine = create_engine('sqlite:///app.db', echo=False)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)CRUD 操作
# 创建
session = Session()
user = User(name='张三', email='zhang@example.com', age=30)
session.add(user)
session.commit()
print(f"用户ID: {user.id}")
# 批量创建
users = [
User(name=name, email=email, age=age)
for name, email, age in [
('李四', 'li@example.com', 25),
('王五', 'wang@example.com', 35),
]
]
session.add_all(users)
session.commit()
# 查询
from sqlalchemy import or_, and_, desc
# 基本查询
all_users = session.query(User).all()
user = session.query(User).filter_by(name='张三').first()
# 条件查询
adults = session.query(User).filter(User.age >= 30).all()
search = session.query(User).filter(
or_(User.name.contains('张'), User.email.contains('example'))
).all()
# 排序和分页
page = session.query(User).order_by(desc(User.age)).limit(10).offset(0).all()
# 聚合
from sqlalchemy import func
stats = session.query(
User.age,
func.count(User.id).label('count'),
func.avg(User.age).label('avg_age')
).group_by(User.age).all()
# 更新
session.query(User).filter_by(name='张三').update({'age': 31})
session.commit()
# 删除
session.query(User).filter_by(id=1).delete()
session.commit()
session.close()SQLAlchemy 进阶查询
from sqlalchemy import or_, and_, desc, func, exists
# 1. 联表查询
from sqlalchemy.orm import joinedload
# 方式一:join
results = session.query(User, Post).join(Post, User.id == Post.user_id).all()
# 方式二:关系加载(自动关联)
results = session.query(User).options(joinedload(User.posts)).all()
# 2. 子查询
subq = session.query(Post.user_id, func.count(Post.id).label('post_count')).group_by(Post.user_id).subquery()
results = session.query(User, subq.c.post_count).outerjoin(subq, User.id == subq.c.user_id).all()
# 3. exists 子查询
from sqlalchemy import exists
active_authors = session.query(User).filter(
exists().where(Post.author_id == User.id)
).all()
# 4. 复合条件
results = session.query(User).filter(
and_(
User.age >= 18,
User.age <= 60,
or_(
User.name.like('张%'),
User.email.like('%@example.com')
)
)
).all()
# 5. 分页工具函数
from typing import Any
def paginate(query, page: int = 1, per_page: int = 10) -> dict[str, Any]:
"""通用分页函数"""
total = query.count()
items = query.offset((page - 1) * per_page).limit(per_page).all()
return {
"items": items,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page,
}
result = paginate(session.query(User).order_by(User.id), page=2, per_page=5)
print(f"第 {result['page']} 页,共 {result['pages']} 页")
# 6. 事务管理
from contextlib import contextmanager
@contextmanager
def transaction_scope(SessionLocal):
"""事务作用域"""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# 使用
with transaction_scope(Session) as session:
user = User(name="新用户", email="new@example.com")
session.add(user)
# 如果这里抛出异常,自动回滚MySQL/PostgreSQL 连接
多数据库连接
# MySQL: pip install pymysql
# PostgreSQL: pip install psycopg2-binary
# SQL Server: pip install pyodbc
from sqlalchemy import create_engine
# SQLite
engine_sqlite = create_engine('sqlite:///app.db')
# MySQL
engine_mysql = create_engine(
'mysql+pymysql://user:password@localhost:3306/mydb',
pool_size=10,
max_overflow=20,
pool_recycle=3600
)
# PostgreSQL
engine_pg = create_engine(
'postgresql+psycopg2://user:password@localhost:5432/mydb',
pool_size=10
)
# SQL Server
engine_mssql = create_engine(
'mssql+pyodbc://user:password@server/database?driver=ODBC+Driver+17+for+SQL+Server'
)连接池配置详解
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# 连接池参数详解
engine = create_engine(
"postgresql+psycopg2://user:pass@localhost/db",
poolclass=QueuePool, # 连接池类(默认)
pool_size=5, # 常驻连接数(不是最大连接数)
max_overflow=10, # 超出 pool_size 后最多创建的临时连接
pool_timeout=30, # 获取连接的超时时间(秒)
pool_recycle=3600, # 连接回收时间(秒),防止 MySQL 的 wait_timeout 断开
pool_pre_ping=True, # 每次使用前发送测试查询,检测连接是否存活
echo=False, # 是否打印 SQL 语句(调试时设为 True)
echo_pool=False, # 是否打印连接池操作
)
# pool_pre_ping 的重要性:
# MySQL 默认 wait_timeout=28800(8小时)
# 如果连接空闲超过 8 小时,MySQL 会断开连接
# pool_recycle=3600 每小时回收一次,防止这种情况
# pool_pre_ping=True 在使用前检测连接,双重保障
# 监控连接池状态
pool = engine.pool
print(f"当前活跃连接: {pool.checkedout()}")
print(f"当前空闲连接: {pool.overflow()}")
print(f"总连接数: {pool.size()}")数据库迁移(Alembic 简介)
# pip install alembic
# 初始化
# alembic init alembic
# 配置 alembic.ini 中的 sqlalchemy.url
# sqlalchemy.url = postgresql://user:pass@localhost/db
# 生成迁移脚本(自动检测模型变更)
# alembic revision --autogenerate -m "create users table"
# 执行迁移
# alembic upgrade head
# 回滚
# alembic downgrade -1优点
缺点
总结
Python 数据库操作:轻量级用 sqlite3(内置),ORM 用 SQLAlchemy。sqlite3 用 ? 占位符防注入,row_factory 获取字典结果。SQLAlchemy 核心:声明式模型(Column/relationship)、Session 管理会话、query 链式查询。多数据库用 create_engine 切换连接字符串。连接池参数:pool_size、max_overflow。生产环境建议用 SQLAlchemy + Alembic 做迁移。
关键知识点
- 永远使用参数化查询(? 占位符)防止 SQL 注入
- SQLite 使用文件级锁,写操作串行;WAL 模式提升并发读性能
- SQLAlchemy 的 Session 不是线程安全的,每个线程应使用独立的 Session
- pool_pre_ping=True 可以检测并丢弃已断开的数据库连接
- 事务内批量操作比逐条提交快 20-200 倍
- row_factory = sqlite3.Row 让查询结果支持字典风格访问
项目落地视角
- 统一虚拟环境、依赖锁定、格式化和日志方案。
- 把入口、配置、业务逻辑和工具函数拆开,避免单文件膨胀。
- 对网络请求、文件读写和数据处理结果做异常与样本校验。
- 保留执行计划、样本 SQL、索引定义和优化前后指标。
- 使用 Alembic 管理数据库迁移,版本化所有 schema 变更
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 脱离真实数据分布设计索引。
- 使用字符串拼接构建 SQL 查询导致注入漏洞
- 忘记关闭数据库连接导致连接泄漏
- 在高并发场景使用 SQLite(应使用 PostgreSQL 或 MySQL)
进阶路线
- 学习 Alembic 实现生产级数据库迁移管理
- 掌握 asyncpg + SQLAlchemy async 实现异步数据库访问
- 研究 SQLAlchemy Core(非 ORM 层)编写高性能原生 SQL
- 了解读写分离、分库分表和连接池调优策略
适用场景
- 当你准备把《数据库操作》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少"脚本黑盒"。
- 一旦脚本进入生产链路,及时补测试和监控。
- 数据库配置从环境变量读取,不要硬编码在代码中
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
- 确认连接池配置是否合理(pool_size、max_overflow、pool_recycle)
- 检查事务是否正确提交或回滚
复盘问题
- 如果把《数据库操作》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《数据库操作》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《数据库操作》最大的收益和代价分别是什么?
SQLAlchemy 异步操作
AsyncSession 异步 ORM
# pip install sqlalchemy[asyncio] asyncpg
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select, update, delete
from sqlalchemy.orm import selectinload
# 异步引擎
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=10,
max_overflow=20,
pool_recycle=3600,
echo=False,
)
# 异步 Session 工厂
AsyncSessionLocal = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
# 异步 CRUD
async def async_crud_examples():
async with AsyncSessionLocal() as session:
# 创建
user = User(name="张三", email="zhang@example.com", age=30)
session.add(user)
await session.commit()
print(f"创建用户 ID: {user.id}")
# 查询
result = await session.execute(
select(User).where(User.age >= 25).order_by(User.id)
)
users = result.scalars().all()
for u in users:
print(f"用户: {u.name}, 年龄: {u.age}")
# 带关系加载的查询
result = await session.execute(
select(User).options(selectinload(User.posts)).where(User.id == 1)
)
user_with_posts = result.scalar_one_or_none()
if user_with_posts:
print(f"用户 {user_with_posts.name} 有 {len(user_with_posts.posts)} 篇文章")
# 批量更新
await session.execute(
update(User).where(User.age < 20).values(is_active=False)
)
await session.commit()
# 删除
await session.execute(delete(User).where(User.id == 999))
await session.commit()
# 异步事务
async def transfer_data_async():
"""异步事务示例"""
async with AsyncSessionLocal() as session:
async with session.begin():
# 在事务中执行多个操作
user = User(name="事务用户", email="tx@example.com")
session.add(user)
post = Post(title="事务文章", content="测试", user_id=user.id)
session.add(post)
# 事务结束自动 commit,异常自动 rollbackSQLAlchemy 事件系统
from sqlalchemy import event
# 监听连接事件
@event.listens_for(async_engine.sync_engine, "connect")
def receive_connect(dbapi_connection, connection_record):
"""连接创建时触发"""
print(f"[DB] 新连接建立: {id(dbapi_connection)}")
@event.listens_for(async_engine.sync_engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
"""连接从池中取出时触发"""
pass
@event.listens_for(async_engine.sync_engine, "checkin")
def receive_checkin(dbapi_connection, connection_record):
"""连接归还到池中时触发"""
pass
# 监听 Session 事件
@event.listens_for(AsyncSession, "before_flush")
def before_flush(session, flush_context, instances):
"""Flush 前触发 — 自动填充审计字段"""
for instance in session.new:
if hasattr(instance, 'created_at') and instance.created_at is None:
instance.created_at = datetime.now()
for instance in session.dirty:
if hasattr(instance, 'updated_at'):
instance.updated_at = datetime.now()Alembic 迁移最佳实践
# alembic/env.py 配置
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
from myapp.models import Base # 导入所有模型
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline():
"""离线模式 — 只生成 SQL 不执行"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""在线模式 — 连接数据库执行迁移"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
# 对比类型变化
compare_type=True,
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()# 迁移脚本示例:alembic/versions/001_create_users.py
"""create users table
Revision ID: 001
Create Date: 2024-01-15
"""
from alembic import op
import sqlalchemy as sa
# revision 标识
revision = '001'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(50), nullable=False),
sa.Column('email', sa.String(120), nullable=False, unique=True),
sa.Column('age', sa.Integer(), nullable=True),
sa.Column('is_active', sa.Boolean(), server_default='1'),
sa.Column('created_at', sa.DateTime(), server_default=sa.func.now()),
sa.PrimaryKeyConstraint('id'),
)
# 创建索引
op.create_index('ix_users_email', 'users', ['email'], unique=True)
op.create_index('ix_users_name', 'users', ['name'])
def downgrade():
op.drop_index('ix_users_name', table_name='users')
op.drop_index('ix_users_email', table_name='users')
op.drop_table('users')数据库操作工具函数
"""数据库工具函数集"""
from contextlib import contextmanager
from typing import Any, Generator, TypeVar, Generic
from dataclasses import dataclass
T = TypeVar('T')
@dataclass
class PageResult(Generic[T]):
"""通用分页结果"""
items: list[T]
total: int
page: int
per_page: int
pages: int
@property
def has_next(self) -> bool:
return self.page < self.pages
@property
def has_prev(self) -> bool:
return self.page > 1
def paginate_query(query, page: int = 1, per_page: int = 20) -> PageResult:
"""SQLAlchemy 查询分页"""
total = query.count()
items = query.offset((page - 1) * per_page).limit(per_page).all()
return PageResult(
items=items,
total=total,
page=page,
per_page=per_page,
pages=(total + per_page - 1) // per_page,
)
# 批量操作工具
def batch_insert(session, model_class, data_list: list[dict], batch_size: int = 1000):
"""批量插入数据(分批提交避免内存溢出)"""
for i in range(0, len(data_list), batch_size):
batch = data_list[i:i + batch_size]
objects = [model_class(**data) for data in batch]
session.bulk_save_objects(objects)
session.commit()
print(f"已插入 {min(i + batch_size, len(data_list))}/{len(data_list)} 条")
# UPSERT(插入或更新)
def upsert(session, model_class, unique_key: str, data: dict):
"""根据唯一键插入或更新"""
existing = session.query(model_class).filter(
getattr(model_class, unique_key) == data[unique_key]
).first()
if existing:
for key, value in data.items():
setattr(existing, key, value)
else:
session.add(model_class(**data))
session.commit()
return existing