Python 设计模式实战
大约 10 分钟约 2997 字
Python 设计模式实战
简介
Python 的设计模式实现与静态语言不同,利用鸭子类型、装饰器、生成器等 Pythonic 特性,可以更简洁地实现经典设计模式。理解 Python 风格的设计模式,有助于编写更优雅、更 Pythonic 的代码。
特点
创建型模式
Pythonic 单例与工厂
from dataclasses import dataclass, field
from functools import wraps
from typing import Protocol
# 1. 单例模式 — 使用模块级别变量(最 Pythonic)
# config.py
class _Config:
def __init__(self):
self.debug = False
self.database_url = "sqlite:///default.db"
self.cache_ttl = 300
# 模块级别实例(天然单例)
config = _Config()
# 或使用元类
class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class Database(metaclass=SingletonMeta):
def __init__(self, url: str):
self.url = url
self.connection = None
def connect(self):
print(f"连接到: {self.url}")
# 2. 工厂模式 — 使用 Protocol 和类型注册
class PaymentHandler(Protocol):
def pay(self, amount: float) -> bool: ...
class AlipayHandler:
def pay(self, amount: float) -> bool:
print(f"支付宝支付: ¥{amount}")
return True
class WechatPayHandler:
def pay(self, amount: float) -> bool:
print(f"微信支付: ¥{amount}")
return True
class BankCardHandler:
def pay(self, amount: float) -> bool:
print(f"银行卡支付: ¥{amount}")
return True
# 工厂注册表
class PaymentFactory:
_handlers: dict[str, type] = {}
@classmethod
def register(cls, name: str):
"""注册处理器装饰器"""
def decorator(handler_cls):
cls._handlers[name] = handler_cls
return handler_cls
return decorator
@classmethod
def create(cls, name: str) -> PaymentHandler:
if name not in cls._handlers:
raise ValueError(f"不支持的支付方式: {name}")
return cls._handlers[name]()
# 注册处理器
PaymentFactory.register("alipay")(AlipayHandler)
PaymentFactory.register("wechat")(WechatPayHandler)
PaymentFactory.register("bank")(BankCardHandler)
# 使用
handler = PaymentFactory.create("alipay")
handler.pay(99.9)
# 3. 建造者模式 — 使用链式调用 + dataclass
@dataclass
class QueryBuilder:
table: str = ""
fields: list[str] = field(default_factory=list)
conditions: list[str] = field(default_factory=list)
order_by: str = ""
limit_val: int = 0
def select(self, *fields) -> "QueryBuilder":
self.fields = list(fields)
return self
def from_table(self, table: str) -> "QueryBuilder":
self.table = table
return self
def where(self, condition: str) -> "QueryBuilder":
self.conditions.append(condition)
return self
def order(self, field: str) -> "QueryBuilder":
self.order_by = field
return self
def limit(self, n: int) -> "QueryBuilder":
self.limit_val = n
return self
def build(self) -> str:
fields = ", ".join(self.fields) if self.fields else "*"
sql = f"SELECT {fields} FROM {self.table}"
if self.conditions:
sql += f" WHERE {' AND '.join(self.conditions)}"
if self.order_by:
sql += f" ORDER BY {self.order_by}"
if self.limit_val:
sql += f" LIMIT {self.limit_val}"
return sql
# 使用
query = (QueryBuilder()
.select("id", "name", "email")
.from_table("users")
.where("age > 18")
.where("status = 'active'")
.order("name")
.limit(20)
.build())
# SELECT id, name, email FROM users WHERE age > 18 AND status = 'active' ORDER BY name LIMIT 20结构型模式
装饰器与代理
# 1. 装饰器模式 — Python 原生装饰器
import time
import functools
import logging
def retry(max_attempts=3, delay=1.0, exceptions=(Exception,)):
"""重试装饰器"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts - 1:
raise
wait = delay * (2 ** attempt)
logging.warning(f"第 {attempt + 1} 次重试: {e}")
time.sleep(wait)
return wrapper
return decorator
def cache_result(ttl=300):
"""带 TTL 的缓存装饰器"""
def decorator(func):
cache = {}
@functools.wraps(func)
def wrapper(*args):
key = str(args)
if key in cache:
result, timestamp = cache[key]
if time.time() - timestamp < ttl:
return result
result = func(*args)
cache[key] = (result, time.time())
return result
return wrapper
return decorator
def measure_time(func):
"""计时装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start) * 1000
logging.info(f"{func.__name__} 耗时: {elapsed:.2f}ms")
return result
return wrapper
# 使用
@retry(max_attempts=3, delay=1.0, exceptions=(ConnectionError,))
@cache_result(ttl=60)
@measure_time
def fetch_data(url):
return requests.get(url).json()
# 2. 代理模式 — 使用 __getattr__
class RemoteServiceProxy:
"""远程服务代理(带缓存和重试)"""
def __init__(self, base_url: str):
self.base_url = base_url
self._cache = {}
def __getattr__(self, name):
"""动态代理方法调用"""
def method(*args, **kwargs):
# 检查缓存
cache_key = f"{name}:{args}:{kwargs}"
if cache_key in self._cache:
return self._cache[cache_key]
# 远程调用
url = f"{self.base_url}/{name}"
response = requests.post(url, json={"args": args, "kwargs": kwargs})
result = response.json()
# 缓存
self._cache[cache_key] = result
return result
return method
# 使用
service = RemoteServiceProxy("https://api.example.com")
result = service.get_user(user_id=123) # 自动代理为 HTTP 请求
# 3. 上下文管理器模式
from contextlib import contextmanager
@contextmanager
def timer(name: str):
"""计时上下文管理器"""
start = time.perf_counter()
yield
elapsed = (time.perf_counter() - start) * 1000
print(f"{name}: {elapsed:.2f}ms")
@contextmanager
def database_transaction(conn):
"""事务上下文管理器"""
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
# 使用
with timer("数据处理"):
process_data()
with database_transaction(db) as conn:
conn.execute("INSERT INTO users ...")行为型模式
观察者与策略
# 1. 观察者模式 — 使用 asyncio
from dataclasses import dataclass, field
from typing import Callable
@dataclass
class EventEmitter:
"""事件发射器"""
_listeners: dict[str, list[Callable]] = field(default_factory=dict)
def on(self, event: str, listener: Callable):
"""注册监听器"""
if event not in self._listeners:
self._listeners[event] = []
self._listeners[event].append(listener)
return self # 支持链式调用
def off(self, event: str, listener: Callable):
"""移除监听器"""
if event in self._listeners:
self._listeners[event].remove(listener)
def emit(self, event: str, *args, **kwargs):
"""发射事件"""
for listener in self._listeners.get(event, []):
listener(*args, **kwargs)
# 使用
emitter = EventEmitter()
emitter.on("user_created", lambda user: print(f"发送欢迎邮件: {user}"))
emitter.on("user_created", lambda user: print(f"记录日志: 创建用户 {user}"))
emitter.on("order_placed", lambda order: print(f"通知仓库: {order}"))
emitter.emit("user_created", {"name": "张三", "email": "test@example.com"})
# 2. 策略模式 — 使用函数和 Protocol
from typing import Protocol
class SortStrategy(Protocol):
def sort(self, data: list) -> list: ...
def bubble_sort(data: list) -> list:
result = data.copy()
n = len(result)
for i in range(n):
for j in range(0, n - i - 1):
if result[j] > result[j + 1]:
result[j], result[j + 1] = result[j + 1], result[j]
return result
def quick_sort(data: list) -> list:
if len(data) <= 1:
return data
pivot = data[len(data) // 2]
left = [x for x in data if x < pivot]
middle = [x for x in data if x == pivot]
right = [x for x in data if x > pivot]
return quick_sort(left) + middle + quick_sort(right)
def python_sort(data: list) -> list:
return sorted(data)
class Sorter:
def __init__(self, strategy=None):
self.strategy = strategy or python_sort
def set_strategy(self, strategy):
self.strategy = strategy
def sort(self, data: list) -> list:
return self.strategy(data)
# 使用
sorter = Sorter(quick_sort)
result = sorter.sort([3, 1, 4, 1, 5, 9])
sorter.set_strategy(bubble_sort)
result2 = sorter.sort([3, 1, 4, 1, 5, 9])
# 3. 状态机模式
from enum import Enum, auto
class OrderState(Enum):
CREATED = auto()
PAID = auto()
SHIPPED = auto()
DELIVERED = auto()
CANCELLED = auto()
class OrderStateMachine:
TRANSITIONS = {
(OrderState.CREATED, "pay"): OrderState.PAID,
(OrderState.PAID, "ship"): OrderState.SHIPPED,
(OrderState.SHIPPED, "deliver"): OrderState.DELIVERED,
(OrderState.CREATED, "cancel"): OrderState.CANCELLED,
(OrderState.PAID, "cancel"): OrderState.CANCELLED,
}
def __init__(self):
self.state = OrderState.CREATED
self._listeners = []
def transition(self, action: str):
key = (self.state, action)
if key not in self.TRANSITIONS:
raise ValueError(f"不允许: {self.state.name} + {action}")
old_state = self.state
self.state = self.TRANSITIONS[key]
self._notify(old_state, self.state, action)
def _notify(self, old, new, action):
for listener in self._listeners:
listener(old, new, action)
def on_transition(self, listener):
self._listeners.append(listener)
# 使用
order = OrderStateMachine()
order.on_transition(lambda o, n, a: print(f"{o.name} --[{a}]--> {n.name}"))
order.transition("pay") # CREATED --[pay]--> PAID
order.transition("ship") # PAID --[ship]--> SHIPPED
order.transition("deliver") # SHIPPED --[deliver]--> DELIVERED优点
缺点
总结
Python 设计模式利用语言特性实现更简洁的版本:单例用模块级变量、工厂用注册表+装饰器、建造者用链式调用。装饰器模式使用 Python 原生 @decorator 语法,支持参数化和堆叠。代理模式通过 __getattr__ 实现动态代理。观察者模式用 EventEmitter 封装回调。策略模式直接使用函数(一等公民)而非类。状态机使用 Enum + 转换表。建议优先使用 Pythonic 的实现方式,避免从 Java/C# 直接翻译。
关键知识点
- 先区分这篇内容属于语法能力、工程能力,还是生态工具能力。
- Python 的开发效率来自生态,但可维护性来自结构、测试和规范。
- 脚本一旦进入长期维护,就必须按项目来治理。
- 框架与语言特性类主题要同时理解运行方式和工程组织方式。
项目落地视角
- 统一虚拟环境、依赖锁定、格式化和日志方案。
- 把入口、配置、业务逻辑和工具函数拆开,避免单文件膨胀。
- 对网络请求、文件读写和数据处理结果做异常与样本校验。
- 明确项目入口、配置管理、依赖管理、日志和测试策略。
常见误区
- 把临时脚本直接当生产代码使用。
- 忽略依赖版本、编码、路径和时区差异。
- 只会写 happy path,没有补超时、重试和资源释放。
- 把 notebook 或脚本风格直接带入长期维护项目。
进阶路线
- 把类型注解、测试、打包和部署纳入统一工程流程。
- 继续向异步、性能、数据管线和框架源码层深入。
- 把常用脚本抽成可复用库或 CLI 工具,而不是复制粘贴。
- 继续补齐部署、打包、监控和性能调优能力。
适用场景
- 当你准备把《Python 设计模式实战》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合脚本自动化、数据处理、Web 开发和测试工具建设。
- 当需求强调快速迭代和丰富生态时,Python 往往能快速起步。
落地建议
- 统一使用虚拟环境与依赖锁定,避免环境漂移。
- 对核心函数补类型注解、异常处理和日志,减少“脚本黑盒”。
- 一旦脚本进入生产链路,及时补测试和监控。
排错清单
- 先确认当前解释器、虚拟环境和依赖版本是否正确。
- 检查编码、路径、时区和第三方库行为差异。
- 排查同步阻塞、数据库连接未释放或网络请求无超时。
复盘问题
- 如果把《Python 设计模式实战》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《Python 设计模式实战》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《Python 设计模式实战》最大的收益和代价分别是什么?
并发模式
Future 与异步任务
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Any
# 1. 线程池 + Future 模式
class AsyncTaskRunner:
"""异步任务执行器(适合 IO 密集型)"""
def __init__(self, max_workers=10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def submit_tasks(self, tasks: list[tuple[callable, tuple, dict]]) -> list[Any]:
"""提交多个任务并收集结果"""
futures = {}
for func, args, kwargs in tasks:
future = self.executor.submit(func, *args, **kwargs)
futures[future] = func.__name__
results = []
for future in as_completed(futures):
name = futures[future]
try:
result = future.result(timeout=30)
results.append({"task": name, "status": "success", "result": result})
except Exception as e:
results.append({"task": name, "status": "error", "error": str(e)})
return results
# 使用:并发请求多个 API
import requests
runner = AsyncTaskRunner(max_workers=5)
tasks = [
(requests.get, ("https://api.example.com/users",), {"timeout": 10}),
(requests.get, ("https://api.example.com/orders",), {"timeout": 10}),
(requests.get, ("https://api.example.com/products",), {"timeout": 10}),
]
results = runner.submit_tasks(tasks)
# 2. asyncio + 生产者-消费者模式
@dataclass
class Job:
id: int
data: dict
async def producer(queue: asyncio.Queue, count: int):
"""生产者:生成任务放入队列"""
for i in range(count):
job = Job(id=i, data={"payload": f"data-{i}"})
await queue.put(job)
print(f"生产: Job-{i}")
await asyncio.sleep(0.01)
async def worker(queue: asyncio.Queue, worker_id: int):
"""消费者:从队列取出任务处理"""
while True:
job = await queue.get()
try:
await asyncio.sleep(0.05) # 模拟 IO 操作
print(f"Worker-{worker_id} 完成: Job-{job.id}")
finally:
queue.task_done()
async def run_producer_consumer():
queue = asyncio.Queue(maxsize=20)
# 启动生产者和多个消费者
producers = [asyncio.create_task(producer(queue, 50))]
consumers = [asyncio.create_task(worker(queue, i)) for i in range(3)]
await asyncio.gather(*producers)
await queue.join() # 等待所有任务完成
for c in consumers:
c.cancel() # 关闭消费者
# asyncio.run(run_producer_consumer())Actor 模式(基于 asyncio)
import asyncio
from dataclasses import dataclass, field
@dataclass
class Actor:
"""基于 asyncio 的 Actor 模式"""
name: str
_queue: asyncio.Queue = field(default_factory=asyncio.Queue)
_running: bool = False
async def start(self):
self._running = True
while self._running:
message = await self._queue.get()
await self.receive(message)
async def stop(self):
self._running = False
await self._queue.put(None) # 发送终止信号
async def send(self, message):
await self._queue.put(message)
async def receive(self, message):
"""子类重写此方法处理消息"""
print(f"[{self.name}] 收到消息: {message}")
class CounterActor(Actor):
"""计数器 Actor"""
def __init__(self, name: str):
super().__init__(name)
self.count = 0
async def receive(self, message):
if message == "increment":
self.count += 1
print(f"[{self.name}] count = {self.count}")
elif message == "get":
print(f"[{self.name}] 当前 count = {self.count}")
elif message is None:
self._running = False适配器与桥接模式
# 适配器模式 — 让不兼容的接口协同工作
class OldEmailService:
"""旧系统接口"""
def send_email(self, to, subject, body):
print(f"[旧接口] 发送邮件到 {to}: {subject}")
class NewNotificationService:
"""新系统接口"""
def notify(self, user_id: str, message: str):
print(f"[新接口] 通知用户 {user_id}: {message}")
class EmailAdapter:
"""适配器:将新接口包装为旧接口"""
def __init__(self, new_service: NewNotificationService):
self.new_service = new_service
def send_email(self, to, subject, body):
# 转换调用
self.new_service.notify(user_id=to, message=f"{subject}\n{body}")
# 使用 — 旧代码无需修改
def send_welcome(email_service, user_email):
email_service.send_email(user_email, "欢迎", "注册成功")
old_service = OldEmailService()
new_service = NewNotificationService()
adapter = EmailAdapter(new_service)
send_welcome(old_service, "user@example.com") # 旧接口
send_welcome(adapter, "user@example.com") # 通过适配器使用新接口