观察者模式详解
大约 10 分钟约 3043 字
观察者模式详解
简介
观察者模式(Observer Pattern)是一种行为型设计模式,它定义了对象之间一对多的依赖关系,当一个对象的状态发生变化时,所有依赖于它的对象都会得到通知并自动更新。在 C# 中,观察者模式有着丰富的实现方式,包括传统的事件(event)机制、IObservable<T> / IObserver<T> 接口,以及通过 MediatR 库实现的通知模式。
特点
基于事件的观察者模式
C# 的 event 机制是实现观察者模式最直接的方式。
// 自定义事件参数
public class OrderCreatedEventArgs : EventArgs
{
public int OrderId { get; }
public string CustomerName { get; }
public decimal Amount { get; }
public DateTime CreatedAt { get; }
public OrderCreatedEventArgs(int orderId, string customerName, decimal amount)
{
OrderId = orderId;
CustomerName = customerName;
Amount = amount;
CreatedAt = DateTime.UtcNow;
}
}
// 发布者(Subject)
public class OrderService
{
// 定义事件
public event EventHandler<OrderCreatedEventArgs>? OrderCreated;
public event EventHandler<OrderCreatedEventArgs>? OrderCancelled;
public void CreateOrder(int orderId, string customer, decimal amount)
{
Console.WriteLine($"订单 {orderId} 已创建,金额: {amount:C}");
// 触发事件
OrderCreated?.Invoke(this,
new OrderCreatedEventArgs(orderId, customer, amount));
}
public void CancelOrder(int orderId, string customer, decimal amount)
{
Console.WriteLine($"订单 {orderId} 已取消");
OrderCancelled?.Invoke(this,
new OrderCreatedEventArgs(orderId, customer, amount));
}
}订阅者实现
// 邮件通知订阅者
public class EmailNotificationHandler
{
public void OnOrderCreated(object? sender, OrderCreatedEventArgs e)
{
Console.WriteLine($"[邮件] 发送订单确认邮件给 {e.CustomerName},订单号: {e.OrderId}");
}
}
// 库存更新订阅者
public class InventoryHandler
{
public void OnOrderCreated(object? sender, OrderCreatedEventArgs e)
{
Console.WriteLine($"[库存] 扣减库存,订单号: {e.OrderId}");
}
public void OnOrderCancelled(object? sender, OrderCreatedEventArgs e)
{
Console.WriteLine($"[库存] 恢复库存,订单号: {e.OrderId}");
}
}
// 积分发放订阅者
public class PointsHandler
{
public void OnOrderCreated(object? sender, OrderCreatedEventArgs e)
{
var points = (int)(e.Amount * 10);
Console.WriteLine($"[积分] 为 {e.CustomerName} 发放 {points} 积分");
}
}
// 使用示例
var orderService = new OrderService();
var emailHandler = new EmailNotificationHandler();
var inventoryHandler = new InventoryHandler();
var pointsHandler = new PointsHandler();
// 订阅事件
orderService.OrderCreated += emailHandler.OnOrderCreated;
orderService.OrderCreated += inventoryHandler.OnOrderCreated;
orderService.OrderCreated += pointsHandler.OnOrderCreated;
orderService.OrderCancelled += inventoryHandler.OnOrderCancelled;
// 创建订单 - 自动通知所有订阅者
orderService.CreateOrder(1001, "张三", 299.99m);IObservable<T> 与 IObserver<T>
.NET 提供了标准的观察者接口 IObservable<T> 和 IObserver<T>,适合处理数据流场景。
// 实现 IObservable<T> - 股票价格发布者
public class StockPriceProvider : IObservable<decimal>
{
private readonly List<IObserver<decimal>> _observers = new();
private readonly Random _random = new();
public IDisposable Subscribe(IObserver<decimal> observer)
{
if (!_observers.Contains(observer))
_observers.Add(observer);
return new Unsubscriber(_observers, observer);
}
public void UpdatePrice(string symbol, decimal price)
{
foreach (var observer in _observers)
{
observer.OnNext(price);
}
}
public void Complete()
{
foreach (var observer in _observers)
observer.OnCompleted();
_observers.Clear();
}
public void ReportError(Exception ex)
{
foreach (var observer in _observers)
observer.OnError(ex);
}
private class Unsubscriber : IDisposable
{
private readonly List<IObserver<decimal>> _observers;
private readonly IObserver<decimal> _observer;
public Unsubscriber(List<IObserver<decimal>> observers, IObserver<decimal> observer)
{
_observers = observers;
_observer = observer;
}
public void Dispose()
{
_observers.Remove(_observer);
}
}
}
// 实现 IObserver<T> - 价格显示面板
public class PriceDisplay : IObserver<decimal>
{
private readonly string _name;
public PriceDisplay(string name) => _name = name;
public void OnNext(decimal value)
{
Console.WriteLine($"[{_name}] 收到价格更新: {value:C}");
}
public void OnError(Exception error)
{
Console.WriteLine($"[{_name}] 发生错误: {error.Message}");
}
public void OnCompleted()
{
Console.WriteLine($"[{_name}] 价格更新已完成");
}
}
// 使用
var provider = new StockPriceProvider();
var display1 = new PriceDisplay("面板A");
var display2 = new PriceDisplay("面板B");
IDisposable sub1 = provider.Subscribe(display1);
IDisposable sub2 = provider.Subscribe(display2);
provider.UpdatePrice("AAPL", 175.50m);
provider.UpdatePrice("AAPL", 176.20m);
sub1.Dispose(); // 取消面板A的订阅
provider.UpdatePrice("AAPL", 177.00m); // 只有面板B收到MediatR 通知模式
MediatR 是 .NET 中流行的中介者库,其 Notification 机制提供了一种解耦的观察者实现。
// 安装 MediatR
// dotnet add package MediatR
// 定义通知
public record OrderPlacedNotification(
int OrderId,
string CustomerName,
decimal Amount,
DateTime PlacedAt) : INotification;
// 通知处理器 - 各处理器互不依赖
public class SendOrderConfirmationHandler : INotificationHandler<OrderPlacedNotification>
{
private readonly ILogger<SendOrderConfirmationHandler> _logger;
public SendOrderConfirmationHandler(ILogger<SendOrderConfirmationHandler> logger)
{
_logger = logger;
}
public async Task Handle(OrderPlacedNotification notification, CancellationToken cancellationToken)
{
_logger.LogInformation("发送订单确认邮件给 {Customer},订单号: {OrderId}",
notification.CustomerName, notification.OrderId);
// 模拟发送邮件
await Task.Delay(100, cancellationToken);
}
}
public class UpdateInventoryHandler : INotificationHandler<OrderPlacedNotification>
{
public async Task Handle(OrderPlacedNotification notification, CancellationToken cancellationToken)
{
Console.WriteLine($"[库存] 扣减库存 - 订单号: {notification.OrderId}");
await Task.Delay(50, cancellationToken);
}
}
public class AwardPointsHandler : INotificationHandler<OrderPlacedNotification>
{
public async Task Handle(OrderPlacedNotification notification, CancellationToken cancellationToken)
{
var points = (int)(notification.Amount * 10);
Console.WriteLine($"[积分] 发放 {points} 积分给 {notification.CustomerName}");
await Task.CompletedTask;
}
}
// 在 ASP.NET Core 中发布通知
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IMediator _mediator;
public OrdersController(IMediator mediator)
{
_mediator = mediator;
}
[HttpPost]
public async Task<IActionResult> PlaceOrder([FromBody] PlaceOrderRequest request)
{
// 处理订单逻辑...
// 发布通知,所有处理器都会收到
await _mediator.Publish(new OrderPlacedNotification(
OrderId: 1001,
CustomerName: request.CustomerName,
Amount: request.Amount,
PlacedAt: DateTime.UtcNow));
return Ok(new { OrderId = 1001, Status = "已创建" });
}
}注册 MediatR
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// 注册 MediatR,自动扫描程序集中的处理器
builder.Services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly);
});优点
高级用法与实战
弱事件模式 — 防止内存泄漏
// 问题:观察者忘记取消订阅导致内存泄漏
// 当订阅者生命周期比发布者短时,发布者持有订阅者的引用
// 导致订阅者无法被 GC 回收
// 解决方案1:使用 WeakEventManager(WPF 内置)
// 解决方案2:实现弱引用观察者
public class WeakObserver<T> where T : class
{
private readonly WeakReference<T> _reference;
private readonly Action<T, object?> _callback;
public WeakObserver(T target, Action<T, object?> callback)
{
_reference = new WeakReference<T>(target);
_callback = callback;
}
public bool TryInvoke(object? arg)
{
if (_reference.TryGetTarget(out var target))
{
_callback(target, arg);
return true;
}
return false; // 目标已被回收
}
}
// 弱引用事件管理器
public class WeakEventPublisher
{
private readonly List<WeakObserver<Action<string>>> _observers = new();
public void Subscribe(Action<string> handler)
{
_observers.Add(new WeakObserver<Action<string>>(handler, (h, msg) => h((string)msg!)));
}
public void Publish(string message)
{
// 清理已失效的弱引用
_observers.RemoveAll(o => !o.TryInvoke(message));
Console.WriteLine($"活跃订阅者: {_observers.Count}");
}
}线程安全的观察者
// 多线程环境下的观察者模式
public class ThreadSafeEventBus
{
private readonly ConcurrentDictionary<string, CopyOnWriteArrayList<Delegate>> _handlers = new();
public IDisposable Subscribe<T>(string topic, Action<T> handler)
{
var list = _handlers.GetOrAdd(topic, _ => new CopyOnWriteArrayList<Delegate>());
list.Add(handler);
return new Subscription(() => list.Remove(handler));
}
public void Publish<T>(string topic, T payload)
{
if (!_handlers.TryGetValue(topic, out var handlers)) return;
// 并行通知所有订阅者
Parallel.ForEach(handlers.Cast<Action<T>>(), handler =>
{
try
{
handler(payload);
}
catch (Exception ex)
{
Console.WriteLine($"订阅者异常: {ex.Message}");
}
});
}
// 顺序通知(保持顺序)
public void PublishSequential<T>(string topic, T payload)
{
if (!_handlers.TryGetValue(topic, out var handlers)) return;
foreach (var handler in handlers.Cast<Action<T>>())
{
try { handler(payload); }
catch (Exception ex)
{
Console.WriteLine($"订阅者异常: {ex.Message}");
}
}
}
private class Subscription : IDisposable
{
private readonly Action _unsubscribe;
public Subscription(Action unsubscribe) => _unsubscribe = unsubscribe;
public void Dispose() => _unsubscribe();
}
}事件过滤与转换
// 带过滤条件的事件订阅
public class FilterableEventBus
{
private readonly Dictionary<Type, List<(Delegate Handler, Predicate<object> Filter)>> _subscriptions = new();
public IDisposable Subscribe<TEvent>(
Action<TEvent> handler,
Predicate<TEvent>? filter = null) where TEvent : class
{
var list = _subscriptions.GetOrAdd(typeof(TEvent), _ => new());
var entry = ((Delegate)handler, filter != null
? (Predicate<object>)(o => filter((TEvent)o))
: _ => true);
list.Add(entry);
return new Subscription(() => list.Remove(entry));
}
public void Publish<TEvent>(TEvent payload) where TEvent : class
{
if (!_subscriptions.TryGetValue(typeof(TEvent), out var list)) return;
foreach (var (handler, filter) in list)
{
if (filter(payload))
{
((Action<TEvent>)handler)(payload);
}
}
}
}
// 使用 — 只订阅金额大于100的订单事件
var bus = new FilterableEventBus();
bus.Subscribe<OrderCreatedEvent>(
e => Console.WriteLine($"大额订单: {e.OrderId}, 金额: {e.Amount}"),
filter: e => e.Amount > 100);
bus.Subscribe<OrderCreatedEvent>(
e => Console.WriteLine($"VIP订单: {e.OrderId}"),
filter: e => e.CustomerName == "VIP");事件聚合器(Event Aggregator)
// 进程内的通用事件聚合器 — 解耦发布者和订阅者
public interface IEventAggregator
{
void Subscribe<TEvent>(Action<TEvent> handler) where TEvent : class;
void Unsubscribe<TEvent>(Action<TEvent> handler) where TEvent : class;
void Publish<TEvent>(TEvent payload) where TEvent : class;
}
public class EventAggregator : IEventAggregator
{
private readonly ConcurrentDictionary<Type, List<Delegate>> _handlers = new();
public void Subscribe<TEvent>(Action<TEvent> handler) where TEvent : class
{
var list = _handlers.GetOrAdd(typeof(TEvent), _ => new List<Delegate>());
lock (list) { list.Add(handler); }
}
public void Unsubscribe<TEvent>(Action<TEvent> handler) where TEvent : class
{
if (_handlers.TryGetValue(typeof(TEvent), out var list))
{
lock (list) { list.Remove(handler); }
}
}
public void Publish<TEvent>(TEvent payload) where TEvent : class
{
if (!_handlers.TryGetValue(typeof(TEvent), out var handlers)) return;
// 快照列表防止迭代时修改
Delegate[] snapshot;
lock (handlers) { snapshot = handlers.ToArray(); }
foreach (var handler in snapshot)
{
((Action<TEvent>)handler)(payload);
}
}
}
// DI 注册为单例
// builder.Services.AddSingleton<IEventAggregator, EventAggregator>();
// 在不同模块中使用
public class OrderModule
{
private readonly IEventAggregator _eventBus;
public OrderModule(IEventAggregator eventBus) => _eventBus = eventBus;
public void CreateOrder(int orderId, decimal amount)
{
// ... 创建订单逻辑 ...
_eventBus.Publish(new OrderCreatedEvent(orderId, "张三", amount));
}
}
public class NotificationModule
{
public NotificationModule(IEventAggregator eventBus)
{
eventBus.Subscribe<OrderCreatedEvent>(OnOrderCreated);
}
private void OnOrderCreated(OrderCreatedEvent e)
{
Console.WriteLine($"发送通知: 订单 {e.OrderId}");
}
}观察者模式的性能优化
// 1. 批量发布 — 减少事件触发频率
public class BatchingEventPublisher
{
private readonly Queue<object> _eventQueue = new();
private readonly Timer _flushTimer;
private readonly int _batchSize;
private readonly Action<object> _publish;
public BatchingEventPublisher(Action<object> publish, int batchSize = 50, int intervalMs = 1000)
{
_publish = publish;
_batchSize = batchSize;
_flushTimer = new Timer(_ => Flush(), null, intervalMs, intervalMs);
}
public void Enqueue(object evt)
{
lock (_eventQueue)
{
_eventQueue.Enqueue(evt);
if (_eventQueue.Count >= _batchSize)
Flush();
}
}
private void Flush()
{
object[] batch;
lock (_eventQueue)
{
batch = _eventQueue.ToArray();
_eventQueue.Clear();
}
foreach (var evt in batch) _publish(evt);
}
}
// 2. 异步观察者 — 不阻塞发布者
public class AsyncEventBus
{
private readonly List<Func<object, Task>> _handlers = new();
public void Subscribe<T>(Func<T, Task> handler)
{
_handlers.Add(obj => handler((T)obj));
}
// 火并忘记模式
public async Task PublishAsync<T>(T payload)
{
var tasks = _handlers.Select(h => h(payload));
await Task.WhenAll(tasks);
}
// 带超时的发布
public async Task PublishWithTimeoutAsync<T>(T payload, TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
var tasks = _handlers.Select(h => Task.Run(() => h(payload), cts.Token));
try { await Task.WhenAll(tasks); }
catch (OperationCanceledException)
{
Console.WriteLine($"部分订阅者处理超时 ({timeout.TotalSeconds}s)");
}
}
}缺点
总结
观察者模式在 C# 中有多种实现方式,每种方式都有其最佳适用场景。传统 event 机制适合组件内部通信;IObservable<T> 适合数据流和响应式编程场景;MediatR 通知模式适合大型应用中的跨模块解耦通信。在选择实现方式时,需要考虑是否需要异步处理、是否需要跨进程通信、以及系统的复杂度和团队的技术熟悉度。
关键知识点
- 模式不是目标,降低耦合和控制变化才是目标。
- 先找变化点、稳定点和协作边界,再决定是否引入模式。
- 同一个模式在不同规模下的收益和代价差异很大。
项目落地视角
- 优先画出参与对象、依赖方向和调用链,再落到代码。
- 把模式放到一个真实场景里,比如支付、规则引擎、工作流或插件扩展。
- 配合单元测试或契约测试,保证重构后的行为没有漂移。
常见误区
- 为了看起来“高级”而套模式。
- 把简单问题拆成过多抽象层,导致阅读和排障都变难。
- 只会背 UML,不会解释为什么这里需要这个模式。
进阶路线
- 继续关注模式之间的组合用法,而不是孤立记忆。
- 从业务建模、演进策略和团队协作角度看模式的适用性。
- 把模式结论沉淀为项目模板、基类或约束文档。
适用场景
- 当你准备把《观察者模式详解》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合在业务规则频繁变化、分支增多或对象协作复杂时引入。
- 当你希望提高扩展性,但又不想把系统拆得过度抽象时,这类主题很有参考价值。
落地建议
- 先识别变化点,再决定是否引入模式,而不是反过来套模板。
- 优先为模式的边界、依赖和调用路径画出简单结构图。
- 把模式落到一个明确场景,例如支付、规则计算、插件扩展或工作流。
排错清单
- 检查抽象层是否过多,导致调用路径和责任不清晰。
- 确认引入模式后是否真的减少了条件分支和重复代码。
- 警惕“为了模式而模式”,尤其是在简单业务里。
复盘问题
- 如果把《观察者模式详解》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《观察者模式详解》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《观察者模式详解》最大的收益和代价分别是什么?
