Python 上下文管理器
大约 12 分钟约 3720 字
Python 上下文管理器
简介
上下文管理器是 Python 中用于管理资源分配与释放的协议机制,通过 with 语句确保资源在使用后正确清理。无论代码块是否抛出异常,exit 方法都会被执行,是文件操作、数据库连接、锁管理等资源安全使用的标准方式。
上下文管理器的核心思想是"获取-使用-释放"模式的自动化。在 Python 引入 with 语句之前,程序员需要手动编写 try/finally 块来确保资源释放,这不仅冗长而且容易遗漏。with 语句将这种模式抽象为一个协议,使得资源管理的代码更加简洁、更不容易出错。从 Python 内部看,with 语句的实现等价于:获取上下文管理器 -> 调用 __enter__() -> 执行代码块 -> 调用 __exit__()。
特点
核心原理
exit 的异常处理机制
class DetailedExitDemo:
"""演示 __exit__ 的完整异常处理行为"""
def __init__(self, name: str):
self.name = name
def __enter__(self):
print(f"[{self.name}] __enter__ 被调用")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f"[{self.name}] __exit__ 被调用")
print(f" exc_type = {exc_type}")
print(f" exc_val = {exc_val}")
print(f" exc_tb = {exc_tb}")
# 三种返回值的效果:
# return True — 抑制异常,with 块之后的代码继续执行
# return False — 不抑制异常,异常继续向上传播
# return None — 等同于 return False
if exc_type is None:
print(f" 正常退出,无异常")
return False
elif issubclass(exc_type, ValueError):
print(f" 捕获 ValueError,抑制异常")
return True # 抑制 ValueError
else:
print(f" 其他异常,不抑制")
return False
# 场景 1:正常退出
print("=== 场景 1: 正常退出 ===")
with DetailedExitDemo("正常"):
print(" 代码块正常执行")
print(" with 块之后继续执行\n")
# 场景 2:ValueError 被抑制
print("=== 场景 2: ValueError ===")
with DetailedExitDemo("异常"):
raise ValueError("测试错误")
print(" with 块之后继续执行(异常被抑制)\n")
# 场景 3:RuntimeError 不被抑制
print("=== 场景 3: RuntimeError ===")
try:
with DetailedExitDemo("异常"):
raise RuntimeError("运行时错误")
except RuntimeError as e:
print(f" 异常传播到外部: {e}\n")with 语句的执行流程
# with 语句的完整执行流程等价于以下代码:
#
# with EXPR as VAR:
# BLOCK
#
# 等价于:
#
# manager = EXPR
# VAR = manager.__enter__()
# try:
# BLOCK
# except Exception:
# if not manager.__exit__(*sys.exc_info()):
# raise
# else:
# manager.__exit__(None, None, None)
#
# 关键点:
# 1. EXPR 只在 with 语句开始时求值一次
# 2. __enter__ 的返回值绑定到 VAR(如果使用了 as)
# 3. __exit__ 始终会被调用,无论 BLOCK 是否抛出异常
# 4. __exit__ 的返回值决定是否抑制异常
import sys
class TraceContextManager:
"""追踪 with 语句执行流程的上下文管理器"""
def __init__(self, name):
self.name = name
def __enter__(self):
print(f" >> {self.name}: __enter__")
return f"{self.name}_resource"
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
print(f" >> {self.name}: __exit__ (正常)")
else:
print(f" >> {self.name}: __exit__ (异常: {exc_type.__name__})")
return False
print("--- 正常流程 ---")
with TraceContextManager("A") as resource_a:
print(f" 使用 {resource_a}")
print("\n--- 异常流程 ---")
try:
with TraceContextManager("B") as resource_b:
print(f" 使用 {resource_b}")
raise ValueError("出错了")
except ValueError:
print(" 异常被捕获")实现
基于类的上下文管理器:数据库会话
import sqlite3
from typing import Optional
class DatabaseSession:
"""数据库会话上下文管理器"""
def __init__(self, db_path: str, autocommit: bool = True):
self.db_path = db_path
self.autocommit = autocommit
self.conn: Optional[sqlite3.Connection] = None
self.cursor: Optional[sqlite3.Cursor] = None
def __enter__(self) -> sqlite3.Cursor:
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
self.cursor = self.conn.cursor()
return self.cursor
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None and self.autocommit:
self.conn.commit()
elif exc_type is not None:
self.conn.rollback()
print(f"事务回滚,异常: {exc_val}")
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()
return False # 不抑制异常
# 使用
with DatabaseSession("app.db") as cursor:
cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", ("张三", "zhang@example.com"))
cursor.execute("SELECT * FROM users")
for row in cursor.fetchall():
print(dict(row))
# 异常时自动回滚,正常时自动提交并关闭连接基于类的文件处理上下文管理器
import os
import shutil
from pathlib import Path
from typing import Optional
class SafeFileWriter:
"""原子写入文件的上下文管理器
先写入临时文件,成功后重命名为目标文件,
确保写入过程中不会损坏原有文件。
"""
def __init__(self, filepath: str, mode: str = "w", encoding: str = "utf-8"):
self.filepath = Path(filepath)
self.temp_path = Path(f"{filepath}.tmp")
self.mode = mode
self.encoding = encoding
self.file = None
def __enter__(self):
self.file = open(self.temp_path, self.mode, encoding=self.encoding)
return self.file
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file:
self.file.close()
if exc_type is None:
# 写入成功,原子替换
if self.filepath.exists():
# 保留备份
backup_path = Path(f"{self.filepath}.bak")
if backup_path.exists():
backup_path.unlink()
shutil.move(str(self.filepath), str(backup_path))
shutil.move(str(self.temp_path), str(self.filepath))
else:
# 写入失败,删除临时文件
if self.temp_path.exists():
self.temp_path.unlink()
return False
# 使用
# with SafeFileWriter("config.json") as f:
# json.dump(config, f, ensure_ascii=False, indent=2)
# 原子写入:要么完全成功,要么完全不变
class TemporaryDirectory:
"""临时目录上下文管理器(Python 3.2+ 标准库已提供 tempfile.TemporaryDirectory)"""
def __init__(self, suffix: str = "", prefix: str = "tmp_"):
import tempfile
self.temp_dir = tempfile.mkdtemp(suffix=suffix, prefix=prefix)
def __enter__(self) -> str:
return self.temp_dir
def __exit__(self, exc_type, exc_val, exc_tb):
shutil.rmtree(self.temp_dir, ignore_errors=True)
return False
# 使用
# with TemporaryDirectory(prefix="test_") as tmpdir:
# # 在 tmpdir 中创建临时文件
# test_file = os.path.join(tmpdir, "test.txt")
# with open(test_file, "w") as f:
# f.write("test")
# # 退出后 tmpdir 及其内容被自动删除基于生成器的上下文管理器:计时与临时环境
from contextlib import contextmanager
import time
import os
from typing import Generator
@contextmanager
def timer(label: str = "代码块") -> Generator:
"""计时代码块执行时间"""
start = time.perf_counter()
print(f"[{label}] 开始执行...")
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f"[{label}] 执行完成,耗时: {elapsed:.4f}s")
@contextmanager
def temp_env(**env_vars) -> Generator:
"""临时设置环境变量,退出后自动恢复"""
old_values = {}
for key, value in env_vars.items():
old_values[key] = os.environ.get(key)
os.environ[key] = str(value)
try:
yield
finally:
for key, old_value in old_values.items():
if old_value is None:
os.environ.pop(key, None)
else:
os.environ[key] = old_value
@contextmanager
def suppress_exceptions(*exceptions) -> Generator:
"""抑制指定异常(类似 contextlib.suppress)"""
try:
yield
except exceptions as e:
print(f"已抑制异常: {type(e).__name__}: {e}")
# 使用
with timer("数据处理"):
data = [i ** 2 for i in range(1_000_000)]
print(f"计算了 {len(data)} 个值")
with temp_env(DATABASE_URL="sqlite:///test.db", DEBUG="1"):
print(os.environ["DATABASE_URL"]) # sqlite:///test.db
# 退出后环境变量自动恢复更多 contextmanager 实用示例
from contextlib import contextmanager
import sys
import io
import logging
from typing import Generator
# 1. 日志级别临时切换
@contextmanager
def log_level(logger: logging.Logger, level: int) -> Generator:
"""临时切换日志级别"""
old_level = logger.level
logger.setLevel(level)
try:
yield
finally:
logger.setLevel(old_level)
# 2. 工作目录临时切换
@contextmanager
def change_directory(path: str) -> Generator:
"""临时切换工作目录"""
old_cwd = os.getcwd()
os.chdir(path)
try:
yield os.getcwd()
finally:
os.chdir(old_cwd)
# 3. 标准输出捕获
@contextmanager
def capture_output() -> Generator[io.StringIO, None, None]:
"""捕获 stdout 输出"""
old_stdout = sys.stdout
buffer = io.StringIO()
sys.stdout = buffer
try:
yield buffer
finally:
sys.stdout = old_stdout
# 4. 重试上下文
@contextmanager
def retry_on_failure(max_attempts: int = 3, delay: float = 1.0) -> Generator:
"""重试上下文管理器"""
attempt = 0
while attempt < max_attempts:
attempt += 1
try:
yield
return # 成功则退出
except Exception as e:
if attempt >= max_attempts:
raise
print(f"第 {attempt} 次失败: {e}, {delay}s 后重试...")
time.sleep(delay)
# 5. 性能计时上下文(返回计时结果)
@contextmanager
def stopwatch() -> Generator[dict, None, None]:
"""返回计时结果的计时器"""
result = {"start": None, "end": None, "elapsed": None}
result["start"] = time.perf_counter()
try:
yield result
finally:
result["end"] = time.perf_counter()
result["elapsed"] = result["end"] - result["start"]
# 使用示例
with stopwatch() as t:
sum(range(10_000_000))
print(f"耗时: {t['elapsed']:.4f}s")ExitStack 与动态资源管理
from contextlib import ExitStack, contextmanager
from typing import Generator
@contextmanager
def acquire_lock(lock_name: str) -> Generator:
"""模拟分布式锁获取"""
print(f"获取锁: {lock_name}")
yield lock_name
print(f"释放锁: {lock_name}")
def process_multiple_resources(resource_ids: list[int]):
"""动态管理多个资源"""
with ExitStack() as stack:
# 动态获取多个锁
locks = [
stack.enter_context(acquire_lock(f"resource_{rid}"))
for rid in resource_ids
]
print(f"已获取所有锁: {locks}")
# 执行需要多锁保护的操作
result = sum(resource_ids) * 2
print(f"处理结果: {result}")
return result
# 所有锁在 ExitStack 退出时自动释放
# 使用
process_multiple_resources([1, 2, 3])
# redirect_stdout 捕获输出示例
from contextlib import redirect_stdout
import io
def capture_print_output(func, *args):
"""捕获函数的 print 输出"""
buffer = io.StringIO()
with redirect_stdout(buffer):
func(*args)
return buffer.getvalue()ExitStack 高级用法
from contextlib import ExitStack, suppress, nullcontext
from typing import Optional
# 1. 条件性资源获取
def conditional_open(filepath: Optional[str] = None):
"""根据条件决定是否打开文件"""
with ExitStack() as stack:
if filepath:
f = stack.enter_context(open(filepath, "r"))
data = f.read()
else:
data = "default data"
# 无论文件是否打开,退出时都会正确清理
print(f"数据: {data[:50]}")
# 2. 动态数量的上下文管理器
def batch_process(file_paths: list[str]):
"""批量打开多个文件"""
with ExitStack() as stack:
files = [stack.enter_context(open(fp, "r")) for fp in file_paths]
# 逐行读取所有文件
for f in files:
for line in f:
yield line.strip()
# 所有文件在退出时自动关闭
# 3. 回调注册
def register_cleanup(callback):
"""注册退出时执行的回调"""
with ExitStack() as stack:
stack.callback(callback, "cleanup_arg")
# 执行业务逻辑
print("执行业务...")
# callback 会在退出时被调用
# 4. nullcontext —— 无操作的上下文管理器(Python 3.7+)
def maybe_open_file(filepath: Optional[str], mode: str = "r"):
"""可选地打开文件"""
ctx = open(filepath, mode) if filepath else nullcontext()
with ctx as f:
if f is not None:
return f.read()
return None
# 5. suppress —— 异常抑制
from contextlib import suppress
with suppress(FileNotFoundError):
os.remove("nonexistent_file.txt") # 不会抛出异常异步上下文管理器
import asyncio
from typing import AsyncGenerator
from contextlib import asynccontextmanager
class AsyncDBPool:
"""异步数据库连接池的模拟"""
def __init__(self, pool_size: int = 5):
self.pool_size = pool_size
self._connections = []
async def __aenter__(self):
print(f"初始化连接池,大小: {self.pool_size}")
self._connections = [f"conn_{i}" for i in range(self.pool_size)]
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭所有连接")
self._connections.clear()
return False
async def query(self, sql: str):
conn = self._connections[0]
await asyncio.sleep(0.1) # 模拟异步查询
return [{"conn": conn, "sql": sql}]
@asynccontextmanager
async def temp_database(db_name: str) -> AsyncGenerator:
"""临时数据库上下文"""
print(f"创建临时数据库: {db_name}")
db = {"name": db_name, "data": {}}
try:
yield db
finally:
print(f"清理临时数据库: {db_name}")
db.clear()
async def main():
# 异步类上下文管理器
async with AsyncDBPool(pool_size=3) as pool:
results = await pool.query("SELECT * FROM users")
print(f"查询结果: {results}")
# 异步生成器上下文管理器
async with temp_database("test_db") as db:
db["data"]["key"] = "value"
print(f"写入数据: {db}")
# asyncio.run(main())异步上下文管理器进阶
import asyncio
from contextlib import asynccontextmanager
# 异步 Redis 连接池管理
class AsyncRedisPool:
"""模拟异步 Redis 连接池"""
def __init__(self, host: str = "localhost", port: int = 6379,
max_connections: int = 10):
self.host = host
self.port = port
self.max_connections = max_connections
self._pool = []
self._available = asyncio.Queue()
async def __aenter__(self):
for i in range(self.max_connections):
conn = {"id": i, "host": self.host, "port": self.port}
await self._available.put(conn)
self._pool.append(conn)
print(f"连接池初始化完成: {self.max_connections} 个连接")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self._pool.clear()
print("连接池已关闭")
return False
@asynccontextmanager
async def get_connection(self):
"""获取单个连接(使用完毕自动归还)"""
conn = await self._available.get()
try:
yield conn
finally:
await self._available.put(conn)
async def execute(self, command: str, *args):
"""执行命令"""
async with self.get_connection() as conn:
await asyncio.sleep(0.01) # 模拟网络延迟
return {"command": command, "args": args, "conn_id": conn["id"]}
async def demo_async_pool():
async with AsyncRedisPool(max_connections=3) as pool:
# 并发执行多个命令
tasks = [
pool.execute("GET", f"key:{i}")
for i in range(10)
]
results = await asyncio.gather(*tasks)
for r in results:
print(f" 结果: {r}")
# asyncio.run(demo_async_pool())优点
缺点
总结
上下文管理器通过 with 语句为资源管理提供了确定性保证,是 Python 工程中防止资源泄漏的标准手段。基于类的实现适合复杂状态管理,基于 @contextmanager 的实现适合简单场景。结合 ExitStack 可以动态管理不确定数量的资源,异步上下文管理器则覆盖了 async/await 场景。
关键知识点
- enter 返回的对象绑定到 with ... as var 的 var 变量
- exit 返回 True 会抑制异常,返回 False 则继续传播
- @contextmanager 装饰器将生成器函数转换为上下文管理器,yield 前是 enter,yield 后是 exit
- 异步上下文管理器使用 aenter 和 aexit 配合 async with 使用
- ExitStack 可以动态管理不确定数量的上下文管理器
- nullcontext 用于条件性的上下文管理
项目落地视角
- 数据库连接、文件句柄、网络连接、锁等资源全部用上下文管理器管理
- 项目中创建通用的计时、日志、事务上下文管理器放入 utils/context.py
- FastAPI 的依赖注入天然支持上下文管理器,可直接 yield 资源
- 测试中使用上下文管理器管理 mock 对象的创建和清理
- 使用 ExitStack 处理需要打开数量不确定的资源场景
常见误区
- 在 exit 中忘记关闭资源或忘记处理异常
- 使用 @contextmanager 时在 yield 前抛出异常导致资源未初始化
- 误解 exit 返回值的作用,意外抑制了本该暴露的异常
- 在异步代码中使用同步上下文管理器,导致阻塞事件循环
- 在 @contextmanager 中忘记使用 try/finally 包裹 yield,异常时资源泄漏
- ExitStack 中注册的回调函数如果自身抛出异常,会覆盖原始异常
进阶路线
- 深入学习 contextlib 模块:suppress、redirect_stdout、nullcontext 等
- 研究 FastAPI 依赖注入系统中上下文管理器的应用模式
- 学习 contextvars 模块实现上下文级别的变量传递
- 探索异步上下文管理器在数据库连接池管理中的应用
适用场景
- 文件、网络、数据库等需要显式释放的资源管理
- 需要临时修改环境(环境变量、工作目录、权限)后自动恢复的场景
- 分布式锁、事务管理等需要保证"获取-释放"配对的操作
落地建议
- 将项目中的资源获取/释放逻辑统一封装为上下文管理器
- 为自定义上下文管理器编写测试,验证正常退出和异常退出两种路径
- 在代码规范中要求所有资源操作必须使用 with 语句
- 多资源场景使用 ExitStack 或嵌套 with(Python 3.1+ 支持单行多 with)
排错清单
- 检查 exit 是否正确处理了所有异常情况,特别是资源清理
- 确认嵌套 with 语句中资源的获取和释放顺序是否正确
- 排查异步场景中是否误用了同步上下文管理器
- 检查 @contextmanager 中 yield 是否被 try/finally 包裹
- 确认 exit 的返回值是否符合预期(是否需要抑制异常)
复盘问题
- 你的项目中是否存在未使用上下文管理器的资源操作?潜在风险是什么?
- 当上下文管理器嵌套层数超过 3 层时,是否有更清晰的组织方式?
- 团队是否对 exit 的异常处理行为有统一认识?
