管道模式
大约 11 分钟约 3302 字
管道模式
简介
管道(Pipeline)将复杂的处理流程分解为一系列独立阶段,数据依次通过每个阶段处理。管道模式是责任链和装饰器的变体,强调数据的流式转换。
管道模式的思想来源于 Unix 哲学 —— "做一件事并做好它"。在 Unix/Linux 中,通过管道符 | 将简单命令串联成强大的数据处理流程。ASP.NET Core 的中间件管道是最著名的管道模式实现之一,每个中间件处理请求后传递给下一个。
管道模式的核心价值在于:将复杂的"大"处理流程拆分为若干个"小"步骤,每个步骤职责单一、可独立测试、可自由组合。这种分解不仅降低了复杂度,也使得每个步骤的变更不会影响其他步骤。
特点
结构分析
UML 类图
+--------------------+
| IPipelineStep |
| <TIn, TOut> |
+--------------------+
| +Process(input, |
| context, next) |
+--------------------+
^
|
+--------+--------+--------+
| | | |
+------+ +------+ +------+ +------+
|Step1 | |Step2 | |Step3 | |StepN |
+------+ +------+ +------+ +------+
+--------------------+
| PipelineBuilder |
| <TIn, TOut> |
+--------------------+
| -_steps: List |
| +Use<TStep>() |
| +Use(func) |
| +Build() |
+--------------------+数据流图
Input ---> [Step 1] ---> [Step 2] ---> [Step 3] ---> ... ---> [Step N] ---> Output
| | | |
PreProcess Transform Validate PostProcess
| | | |
PostProcess Log Sanitize Format实现
泛型管道
// 管道上下文
public class PipelineContext
{
public Dictionary<string, object> Items { get; } = new();
public List<string> Logs { get; } = new();
public bool IsCancelled { get; set; }
public Exception? Error { get; set; }
}
// 管道步骤接口
public interface IPipelineStep<TIn, TOut>
{
TOut Process(TIn input, PipelineContext context, Func<TIn, TOut> next);
}
// 管道构建器
public class PipelineBuilder<TIn, TOut>
{
private readonly List<object> _steps = new();
public PipelineBuilder<TIn, TOut> Use<TStep>() where TStep : IPipelineStep<TIn, TOut>, new()
{
_steps.Add(new TStep());
return this;
}
public PipelineBuilder<TIn, TOut> Use(Func<TIn, PipelineContext, Func<TIn, TOut>, TOut> step)
{
_steps.Add(step);
return this;
}
public Func<TIn, PipelineContext, TOut> Build()
{
// 从后向前构建委托链
Func<TIn, TOut> pipeline = input => default!;
for (int i = _steps.Count - 1; i >= 0; i--)
{
var next = pipeline;
var step = _steps[i];
if (step is Func<TIn, PipelineContext, Func<TIn, TOut>, TOut> funcStep)
pipeline = input => funcStep(input, new PipelineContext(), next);
else
pipeline = input => ((IPipelineStep<TIn, TOut>)step).Process(input, new PipelineContext(), next);
}
return (input, ctx) => pipeline(input);
}
}
// 使用
var pipeline = new PipelineBuilder<string, string>()
.Use((input, ctx, next) =>
{
ctx.Logs.Add("步骤1: 去除空格");
return next(input.Trim());
})
.Use((input, ctx, next) =>
{
ctx.Logs.Add("步骤2: 转大写");
return next(input.ToUpper());
})
.Use((input, ctx, next) =>
{
ctx.Logs.Add("步骤3: 添加前缀");
return $"PREFIX_{input}";
})
.Build();
var result = pipeline(" hello world ", new PipelineContext());
Console.WriteLine(result); // PREFIX_HELLO WORLD请求处理管道
public class RequestContext
{
public string Path { get; init; } = "";
public string Method { get; init; } = "";
public Dictionary<string, string> Headers { get; init; } = new();
public string? Body { get; set; }
public int StatusCode { get; set; } = 200;
public string Response { get; set; } = "";
public long DurationMs { get; set; }
}
public class RequestPipeline
{
private readonly List<Func<RequestContext, Func<Task>, Task>> _middleware = new();
public RequestPipeline Use(Func<RequestContext, Func<Task>, Task> middleware)
{
_middleware.Add(middleware);
return this;
}
public async Task<RequestContext> ExecuteAsync(RequestContext ctx)
{
var sw = System.Diagnostics.Stopwatch.StartNew();
Func<Task> pipeline = () => { ctx.StatusCode = 404; ctx.Response = "Not Found"; return Task.CompletedTask; };
for (int i = _middleware.Count - 1; i >= 0; i--)
{
var next = pipeline;
var mw = _middleware[i];
pipeline = () => mw(ctx, next);
}
await pipeline();
sw.Stop();
ctx.DurationMs = sw.ElapsedMilliseconds;
return ctx;
}
}
// 使用
var app = new RequestPipeline();
app.Use(async (ctx, next) => { Console.WriteLine($"-> {ctx.Method} {ctx.Path}"); await next(); });
app.Use(async (ctx, next) =>
{
if (!ctx.Headers.ContainsKey("Authorization")) { ctx.StatusCode = 401; ctx.Response = "Unauthorized"; return; }
await next();
});
app.Use(async (ctx, next) =>
{
if (ctx.Path == "/api/hello") { ctx.StatusCode = 200; ctx.Response = "Hello!"; return; }
await next();
});
var result = await app.ExecuteAsync(new RequestContext { Path = "/api/hello", Method = "GET", Headers = { { "Authorization", "Bearer x" } } });数据处理管道
public static class DataPipeline
{
public static IEnumerable<T> Pipe<T>(this IEnumerable<T> source, Func<IEnumerable<T>, IEnumerable<T>> transform)
=> transform(source);
public static IEnumerable<string> ProcessData(string[] rawData) => rawData
.Pipe(data => data.Where(s => !string.IsNullOrWhiteSpace(s)))
.Pipe(data => data.Select(s => s.Trim().ToLower()))
.Pipe(data => data.Distinct())
.Pipe(data => data.OrderBy(s => s));
}实战:订单处理管道
在电商系统中,订单处理通常涉及多个步骤:验证、计算价格、检查库存、应用优惠、生成物流单号等。使用管道模式可以将这些步骤串联起来。
public class Order
{
public string OrderId { get; set; } = "";
public List<OrderItem> Items { get; set; } = new();
public decimal SubTotal { get; set; }
public decimal Discount { get; set; }
public decimal TotalAmount { get; set; }
public string CouponCode { get; set; } = "";
public string? ShippingTracking { get; set; }
public bool IsValid { get; set; }
public List<string> Errors { get; set; } = new();
}
public class OrderItem
{
public string ProductId { get; set; } = "";
public string Name { get; set; } = "";
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
public decimal Amount => Quantity * UnitPrice;
}
// 订单管道
public class OrderPipeline
{
private readonly List<Func<Order, Func<Task>, Task>> _steps = new();
public OrderPipeline Use(Func<Order, Func<Task>, Task> step)
{
_steps.Add(step);
return this;
}
public async Task<Order> ExecuteAsync(Order order)
{
Func<Task> pipeline = () => Task.CompletedTask;
for (int i = _steps.Count - 1; i >= 0; i--)
{
var next = pipeline;
var step = _steps[i];
pipeline = () => step(order, next);
}
await pipeline();
return order;
}
}
// 各处理步骤
public static class OrderSteps
{
// 步骤 1: 验证订单
public static Func<Order, Func<Task>, Task> Validate =>
async (order, next) =>
{
Console.WriteLine("[验证] 检查订单数据");
if (order.Items.Count == 0) order.Errors.Add("订单为空");
if (order.Items.Any(i => i.Quantity <= 0)) order.Errors.Add("商品数量无效");
order.IsValid = order.Errors.Count == 0;
if (order.IsValid) await next();
};
// 步骤 2: 计算小计
public static Func<Order, Func<Task>, Task> CalculateSubTotal =>
async (order, next) =>
{
order.SubTotal = order.Items.Sum(i => i.Amount);
Console.WriteLine($"[计算] 小计: {order.SubTotal:C}");
await next();
};
// 步骤 3: 应用优惠
public static Func<Order, Func<Task>, Task> ApplyDiscount =>
async (order, next) =>
{
if (!string.IsNullOrEmpty(order.CouponCode))
{
order.Discount = order.SubTotal * 0.1m; // 10% 折扣
Console.WriteLine($"[优惠] 优惠码 {order.CouponCode}, 折扣: {order.Discount:C}");
}
await next();
};
// 步骤 4: 计算总额
public static Func<Order, Func<Task>, Task> CalculateTotal =>
async (order, next) =>
{
order.TotalAmount = order.SubTotal - order.Discount;
Console.WriteLine($"[计算] 总额: {order.TotalAmount:C}");
await next();
};
}
// 使用
var pipeline = new OrderPipeline()
.Use(OrderSteps.Validate)
.Use(OrderSteps.CalculateSubTotal)
.Use(OrderSteps.ApplyDiscount)
.Use(OrderSteps.CalculateTotal);
var order = new Order
{
OrderId = "ORD-001",
CouponCode = "SAVE10",
Items = new List<OrderItem>
{
new() { ProductId = "P1", Name = "笔记本", Quantity = 2, UnitPrice = 5000 },
new() { ProductId = "P2", Name = "鼠标", Quantity = 1, UnitPrice = 200 }
}
};
await pipeline.ExecuteAsync(order);实战:数据清洗 ETL 管道
public class DataRecord
{
public int Id { get; set; }
public string Name { get; set; } = "";
public string Email { get; set; } = "";
public DateTime? CreatedAt { get; set; }
public bool IsValid { get; set; }
public string? Error { get; set; }
}
public class EtlPipeline
{
private readonly List<Func<IEnumerable<DataRecord>, IEnumerable<DataRecord>>> _transforms = new();
public EtlPipeline Transform(Func<IEnumerable<DataRecord>, IEnumerable<DataRecord>> transform)
{
_transforms.Add(transform);
return this;
}
public IEnumerable<DataRecord> Execute(IEnumerable<DataRecord> source)
{
var result = source;
foreach (var transform in _transforms)
result = transform(result);
return result;
}
}
// 使用
var rawData = new List<DataRecord>
{
new() { Id = 1, Name = " 张三 ", Email = "zhangsan@example.com", CreatedAt = null },
new() { Id = 2, Name = "李四", Email = "invalid-email", CreatedAt = DateTime.Now },
new() { Id = 3, Name = "", Email = "", CreatedAt = null },
new() { Id = 4, Name = "王五", Email = "wangwu@example.com", CreatedAt = DateTime.Now },
new() { Id = 5, Name = "赵六", Email = "zhaoliu@example.com", CreatedAt = DateTime.Now },
};
var cleanData = new EtlPipeline()
.Transform(data => data.Select(r => { r.Name = r.Name.Trim(); return r; })) // 清洗名称
.Transform(data => data.Where(r => !string.IsNullOrEmpty(r.Name))) // 过滤空记录
.Transform(data => data.Select(r =>
{ // 验证邮箱
r.IsValid = r.Email.Contains("@");
if (!r.IsValid) r.Error = "邮箱格式无效";
return r;
}))
.Transform(data => data.Select(r =>
{ // 默认值
r.CreatedAt ??= DateTime.UtcNow;
return r;
}))
.Execute(rawData);
foreach (var record in cleanData)
Console.WriteLine($"[{(record.IsValid ? "OK" : "ERR")}] {record.Name} - {record.Email}");与其他模式的对比
管道模式 责任链模式 装饰器模式
+--------+ +--------+ +--------+
| 数据流 | | 请求流 | | 增强流 |
| 变换 | | 传递 | | 包装 |
+--------+ +--------+ +--------+
| 每步 | | 某步 | | 每层 |
| 都处理 | | 可处理 | | 都增强 |
+--------+ +--------+ +--------+
| 有输入 | | 无返回 | | 同接口 |
| 输出 | | 值 | | 透明 |
+--------+ +--------+ +--------+优点
缺点
总结
管道模式将处理流程分解为独立阶段,数据流经各阶段完成转换。ASP.NET Core 中间件是经典实现。泛型管道构建器支持类型安全的步骤组合。数据处理管道可使用 LINQ 的链式调用实现。建议在请求处理、数据转换、ETL 等需要多步处理的场景使用管道模式。
管道模式的本质价值在于:将一个复杂的处理流程拆分为若干个独立、可组合、可测试的小步骤,每一步只做一件事。这不仅让代码更清晰,也使得你可以像搭积木一样灵活组装不同的处理流程。
关键知识点
- 模式不是目标,降低耦合和控制变化才是目标。
- 先找变化点、稳定点和协作边界,再决定是否引入模式。
- 同一个模式在不同规模下的收益和代价差异很大。
项目落地视角
- 优先画出参与对象、依赖方向和调用链,再落到代码。
- 把模式放到一个真实场景里,比如支付、规则引擎、工作流或插件扩展。
- 配合单元测试或契约测试,保证重构后的行为没有漂移。
常见误区
- 为了看起来"高级"而套模式。
- 把简单问题拆成过多抽象层,导致阅读和排障都变难。
- 只会背 UML,不会解释为什么这里需要这个模式。
进阶路线
- 继续关注模式之间的组合用法,而不是孤立记忆。
- 从业务建模、演进策略和团队协作角度看模式的适用性。
- 把模式结论沉淀为项目模板、基类或约束文档。
适用场景
- 当你准备把《管道模式》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合在业务规则频繁变化、分支增多或对象协作复杂时引入。
- 当你希望提高扩展性,但又不想把系统拆得过度抽象时,这类主题很有参考价值。
落地建议
- 先识别变化点,再决定是否引入模式,而不是反过来套模板。
- 优先为模式的边界、依赖和调用路径画出简单结构图。
- 把模式落到一个明确场景,例如支付、规则计算、插件扩展或工作流。
排错清单
- 检查抽象层是否过多,导致调用路径和责任不清晰。
- 确认引入模式后是否真的减少了条件分支和重复代码。
- 警惕"为了模式而模式",尤其是在简单业务里。
复盘问题
- 如果把《管道模式》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《管道模式》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《管道模式》最大的收益和代价分别是什么?
实战:中间件管道(仿 ASP.NET Core)
以下实现一个类似 ASP.NET Core 的中间件管道,展示管道模式在生产代码中的实际应用。
/// <summary>
/// 轻量级中间件管道 — 可用于非 Web 场景的消息处理
/// </summary>
public class MiddlewarePipeline<TContext> where TContext : class
{
private readonly List<Func<TContext, Func<Task>, Task>> _middleware = new();
public MiddlewarePipeline<TContext> Use(Func<TContext, Func<Task>, Task> middleware)
{
_middleware.Add(middleware);
return this;
}
public Func<TContext, Task> Build()
{
// 从后往前构建委托链
Func<Task> app = () => Task.CompletedTask;
foreach (var mw in _middleware.AsEnumerable().Reverse())
{
var next = app;
app = () => mw(default!, next);
}
return async ctx =>
{
Func<Task> current = () => Task.CompletedTask;
foreach (var mw in _middleware.AsEnumerable().Reverse())
{
var next = current;
current = () => mw(ctx, next);
}
await current();
};
}
}
// 内置常用中间件
public static class CommonMiddleware
{
// 异常处理中间件
public static Func<TContext, Func<Task>, Task> ExceptionHandler<TContext>(
Action<TContext, Exception> handler) where TContext : class
{
return async (ctx, next) =>
{
try
{
await next();
}
catch (Exception ex)
{
handler(ctx, ex);
}
};
}
// 日志中间件
public static Func<TContext, Func<Task>, Task> Logging<TContext>(
Action<string> log) where TContext : class
{
return async (ctx, next) =>
{
log($"[Pipe] 开始处理: {ctx!.GetType().Name}");
var sw = System.Diagnostics.Stopwatch.StartNew();
await next();
sw.Stop();
log($"[Pipe] 处理完成: {sw.ElapsedMilliseconds}ms");
};
}
// 超时中间件
public static Func<TContext, Func<Task>, Task> Timeout<TContext>(
TimeSpan timeout) where TContext : class
{
return async (ctx, next) =>
{
using var cts = new CancellationTokenSource(timeout);
var task = next();
var completed = await Task.WhenAny(task, Task.Delay(timeout));
if (completed != task)
{
throw new TimeoutException($"管道执行超时: {timeout.TotalSeconds}s");
}
};
}
}
// 使用
var pipeline = new MiddlewarePipeline<MessageContext>()
.Use(CommonMiddleware.ExceptionHandler<MessageContext>((ctx, ex) =>
Console.WriteLine($"异常: {ex.Message}")))
.Use(CommonMiddleware.Logging<MessageContext>(Console.WriteLine))
.Use(CommonMiddleware.Timeout<MessageContext>(TimeSpan.FromSeconds(30)))
.Use(async (ctx, next) =>
{
// 业务处理
Console.WriteLine($"处理消息: {ctx.MessageId}");
await next();
});
var builtPipeline = pipeline.Build();
// await builtPipeline(new MessageContext { MessageId = "MSG-001" });管道模式与 IEnumerable 延迟执行
/// <summary>
/// 利用 LINQ 的延迟执行特性实现高效的流式处理管道
/// </summary>
public static class StreamPipeline
{
// 流式数据处理 — 适合大文件或数据流
public static IEnumerable<TResult> ProcessStream<T, TResult>(
IEnumerable<T> source,
Func<IEnumerable<T>, IEnumerable<TResult>> pipeline)
{
// LINQ 本身是延迟执行的,数据逐条通过管道
// 这意味着即使源数据有百万条,也不会一次性全部加载到内存
return pipeline(source);
}
// 实战:日志文件分析管道
public static IEnumerable<LogSummary> AnalyzeLogs(string filePath)
{
return File.ReadLines(filePath) // 逐行读取,不一次性加载
.Where(line => !string.IsNullOrWhiteSpace(line))
.Select(line => ParseLogEntry(line))
.Where(entry => entry != null)
.GroupBy(entry => entry!.Level)
.Select(group => new LogSummary
{
Level = group.Key,
Count = group.Count(),
LatestTime = group.Max(e => e.Timestamp),
SampleMessage = group.First().Message
})
.OrderByDescending(s => s.Count);
}
private static LogEntry? ParseLogEntry(string line)
{
// 简化的日志解析
var parts = line.Split('|');
if (parts.Length < 3) return null;
return new LogEntry
{
Timestamp = DateTime.TryParse(parts[0], out var t) ? t : DateTime.MinValue,
Level = parts[1].Trim(),
Message = parts[2].Trim()
};
}
}
public class LogEntry
{
public DateTime Timestamp { get; set; }
public string Level { get; set; } = "";
public string Message { get; set; } = "";
}
public class LogSummary
{
public string Level { get; set; } = "";
public int Count { get; set; }
public DateTime LatestTime { get; set; }
public string SampleMessage { get; set; } = "";
}管道模式在真实项目中的应用场景
场景 1:消息处理管道(消息队列消费者)
消息接收 → 反序列化 → 校验 → 去重 → 业务处理 → 结果持久化 → ACK
场景 2:API 网关请求管道
限流 → 认证 → 授权 → 参数校验 → 路由 → 转发 → 响应缓存 → 日志
场景 3:ETL 数据管道
数据抽取 → 格式转换 → 数据清洗 → 质量校验 → 业务规则 → 入库
场景 4:CI/CD 构建管道
代码拉取 → 依赖安装 → 编译 → 单元测试 → 集成测试 → 镜像构建 → 部署
场景 5:WPF/MVVM 命令管道
命令校验 → 权限检查 → 业务执行 → 结果通知 → 日志记录