MassTransit 消息总线
大约 8 分钟约 2332 字
MassTransit 消息总线
简介
MassTransit 是 .NET 平台最流行的消息总线框架,提供了统一的抽象层来使用 RabbitMQ、Azure Service Bus、Kafka 等消息中间件。它内置了消费者、发布/订阅、Saga 编排、调度、故障处理等功能,是构建事件驱动微服务的利器。
特点
基本用法
安装与配置
# RabbitMQ 传输
dotnet add package MassTransit.RabbitMQ/// <summary>
/// MassTransRabbitMQ 配置
/// </summary>
// Program.cs
builder.Services.AddMassTransit(x =>
{
// 自动注册消费者
x.AddConsumersFromNamespaceContaining<OrderCreatedConsumer>();
// 使用 RabbitMQ
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
// 自动配置端点
cfg.ConfigureEndpoints(context);
// 重试策略
cfg.UseMessageRetry(r =>
{
r.Exponential(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(5));
});
// 消息过期
cfg.UseInMemoryOutbox();
});
});定义消息
/// <summary>
/// MassTransit 消息定义 — 推荐使用 record
/// </summary>
// 事件 — 使用 Interface 标记
public interface OrderCreated
{
Guid OrderId { get; }
string UserId { get; }
decimal TotalAmount { get; }
DateTime CreatedAt { get; }
}
// 命令
public interface ProcessPayment
{
Guid OrderId { get; }
decimal Amount { get; }
string PaymentMethod { get; }
}
// record 实现
public record OrderCreatedEvent(
Guid OrderId,
string UserId,
decimal TotalAmount,
DateTime CreatedAt
) : OrderCreated;
public record ProcessPaymentCommand(
Guid OrderId,
decimal Amount,
string PaymentMethod
) : ProcessPayment;消费者
基本消费者
/// <summary>
/// 消费者 — 处理消息
/// </summary>
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
private readonly ILogger<OrderCreatedConsumer> _logger;
private readonly IInventoryService _inventoryService;
public OrderCreatedConsumer(ILogger<OrderCreatedConsumer> logger, IInventoryService inventoryService)
{
_logger = logger;
_inventoryService = inventoryService;
}
public async Task Consume(ConsumeContext<OrderCreated> context)
{
var msg = context.Message;
_logger.LogInformation("处理订单创建事件:{OrderId}", msg.OrderId);
// 扣减库存
await _inventoryService.ReserveForOrderAsync(msg.OrderId);
_logger.LogInformation("库存预留完成:{OrderId}", msg.OrderId);
}
}
// 注册消费者
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.AddConsumer<PaymentProcessedConsumer>();
});多消费者
/// <summary>
/// 一个消息多个消费者 — 广播模式
/// </summary>
// 消费者1:发送邮件通知
public class OrderNotificationConsumer : IConsumer<OrderCreated>
{
private readonly IEmailService _emailService;
public OrderNotificationConsumer(IEmailService emailService)
{
_emailService = emailService;
}
public async Task Consume(ConsumeContext<OrderCreated> context)
{
await _emailService.SendOrderConfirmationAsync(context.Message.UserId, context.Message.OrderId);
}
}
// 消费者2:更新统计
public class OrderStatisticsConsumer : IConsumer<OrderCreated>
{
private readonly IStatisticsService _statsService;
public OrderStatisticsConsumer(IStatisticsService statsService)
{
_statsService = statsService;
}
public async Task Consume(ConsumeContext<OrderCreated> context)
{
await _statsService.RecordNewOrderAsync(context.Message.TotalAmount);
}
}
// 消费者3:审计日志
public class OrderAuditConsumer : IConsumer<OrderCreated>
{
public Task Consume(ConsumeContext<OrderCreated> context)
{
Console.WriteLine($"审计日志:订单 {context.Message.OrderId} 创建");
return Task.CompletedTask;
}
}发布消息
从任意位置发布
/// <summary>
/// 发布消息 — 使用 IBus 或 IPublishEndpoint
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IPublishEndpoint _publishEndpoint;
private readonly IRequestClient<CheckInventory> _requestClient;
public OrdersController(IPublishEndpoint publishEndpoint, IRequestClient<CheckInventory> requestClient)
{
_publishEndpoint = publishEndpoint;
_requestClient = requestClient;
}
// 发布事件
[HttpPost]
public async Task<IActionResult> Create([FromBody] CreateOrderRequest request)
{
var orderId = Guid.NewGuid();
// 发布事件 — 所有消费者都会收到
await _publishEndpoint.Publish<OrderCreated>(new OrderCreatedEvent(
orderId,
request.UserId,
request.TotalAmount,
DateTime.UtcNow
));
return Ok(new { OrderId = orderId });
}
// 发送命令 — 只有一个消费者处理
[HttpPost("payment")]
public async Task<IActionResult> ProcessPayment([FromBody] PaymentRequest request)
{
await _publishEndpoint.Send<ProcessPayment>(new ProcessPaymentCommand(
request.OrderId,
request.Amount,
request.PaymentMethod
));
return Accepted();
}
}请求/响应模式
/// <summary>
/// 请求/响应 — RPC over Message Queue
/// </summary>
// 请求消息
public interface CheckInventory
{
int ProductId { get; }
int Quantity { get; }
}
// 响应消息
public interface InventoryResult
{
bool Available { get; }
int StockQuantity { get; }
}
// 响应方
public class CheckInventoryConsumer : IConsumer<CheckInventory>
{
private readonly IInventoryRepository _repo;
public CheckInventoryConsumer(IInventoryRepository repo) => _repo = repo;
public async Task Consume(ConsumeContext<CheckInventory> context)
{
var stock = await _repo.GetStockAsync(context.Message.ProductId);
var available = stock >= context.Message.Quantity;
await context.RespondAsync<InventoryResult>(new
{
Available = available,
StockQuantity = stock
});
}
}
// 请求方
public class OrderService
{
private readonly IRequestClient<CheckInventory> _client;
public OrderService(IRequestClient<CheckInventory> client) => _client = client;
public async Task<bool> CheckStockAsync(int productId, int quantity)
{
var response = await _client.GetResponse<InventoryResult>(new
{
ProductId = productId,
Quantity = quantity
});
return response.Message.Available;
}
}Saga 状态机 — 长事务编排
订单流程编排
/// <summary>
/// MassTransit Saga 状态机 — 编排订单完整流程
/// </summary>
// 安装:dotnet add package MassTransit.Azure.ServiceBus.Core
public class OrderSaga : MassTransitStateMachine<OrderSagaState>
{
public State Created { get; private set; } = null!;
public State PaymentPending { get; private set; } = null!;
public State Paid { get; private set; } = null!;
public State Shipping { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Cancelled { get; private set; } = null!;
public Event<OrderCreated> OrderCreatedEvent { get; private set; } = null!;
public Event<PaymentCompleted> PaymentCompletedEvent { get; private set; } = null!;
public Event<OrderShipped> OrderShippedEvent { get; private set; } = null!;
public Event<OrderCancelled> OrderCancelledEvent { get; private set; } = null!;
public Event<PaymentFailed> PaymentFailedEvent { get; private set; } = null!;
public OrderSaga()
{
// 关联 ID
Event(() => OrderCreatedEvent, x => x.CorrelateById(c => c.Message.OrderId));
Event(() => PaymentCompletedEvent, x => x.CorrelateById(c => c.Message.OrderId));
Event(() => OrderShippedEvent, x => x.CorrelateById(c => c.Message.OrderId));
Event(() => OrderCancelledEvent, x => x.CorrelateById(c => c.Message.OrderId));
// 初始状态
Initially(
When(OrderCreatedEvent)
.Then(ctx => {
ctx.Saga.UserId = ctx.Message.UserId;
ctx.Saga.TotalAmount = ctx.Message.TotalAmount;
ctx.Saga.CreatedAt = DateTime.UtcNow;
})
.Send(new Uri("queue:process-payment"), ctx => new ProcessPaymentCommand(
ctx.Saga.CorrelationId,
ctx.Saga.TotalAmount,
"Alipay"))
.TransitionTo(PaymentPending)
);
// 等待支付
During(PaymentPending,
When(PaymentCompletedEvent)
.Then(ctx => ctx.Saga.PaymentMethod = ctx.Message.PaymentMethod)
.Send(new Uri("queue:ship-order"), ctx => new ShipOrderCommand(ctx.Saga.CorrelationId))
.TransitionTo(Shipping),
When(PaymentFailedEvent)
.Then(ctx => ctx.Saga.FailureReason = ctx.Message.Reason)
.TransitionTo(Cancelled),
When(OrderCancelledEvent)
.TransitionTo(Cancelled)
);
// 等待发货
During(Shipping,
When(OrderShippedEvent)
.Then(ctx => {
ctx.Saga.TrackingNumber = ctx.Message.TrackingNumber;
ctx.Saga.CompletedAt = DateTime.UtcNow;
})
.TransitionTo(Completed)
);
}
}
// Saga 状态
public class OrderSagaState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; } = "";
public string UserId { get; set; } = "";
public decimal TotalAmount { get; set; }
public string? PaymentMethod { get; set; }
public string? TrackingNumber { get; set; }
public string? FailureReason { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime? CompletedAt { get; set; }
}错误处理
重试与死信
/// <summary>
/// MassTransit 错误处理 — 重试、死信、降级
/// </summary>
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
// 全局重试
cfg.UseMessageRetry(r =>
{
r.Immediate(3); // 立即重试 3 次
// r.Exponential(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(5));
});
// 错误队列配置
cfg.ReceiveEndpoint("order-service", e =>
{
// 重试次数
e.UseMessageRetry(r => r.Immediate(3));
// 消费错误后进入 _error 队列
// 自动创建 {queue}_error 和 {queue}_skipped 队列
});
});
});
// 消费者级别的错误处理
public class SafeOrderConsumer : IConsumer<OrderCreated>, IConsumer<Fault<OrderCreated>>
{
public async Task Consume(ConsumeContext<OrderCreated> context)
{
// 正常处理逻辑
throw new Exception("模拟错误"); // 会触发重试
}
// 错误消费 — 所有重试失败后
public async Task Consume(ConsumeContext<Fault<OrderCreated>> context)
{
// 记录到数据库、发送告警
Console.WriteLine($"订单 {context.Message.Message.OrderId} 处理失败");
await Task.CompletedTask;
}
}消息模式对比
| 模式 | 方法 | 特点 |
|---|---|---|
| 发布/订阅 | Publish | 一消息多消费者 |
| 点对点 | Send | 一消息一消费者 |
| 请求/响应 | Request/Respond | RPC over MQ |
| Saga | StateMachine | 长事务编排 |
优点
缺点
总结
MassTransit 是 .NET 事件驱动架构的最佳选择。发布/订阅用于事件广播,请求/响应用于同步查询,Saga 用于长事务编排。与直接使用 RabbitMQ SDK 相比,MassTransit 提供了更高级的抽象和更完善的基础设施支持。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 消息系统主题的核心是削峰、解耦和异步一致性,而不是单纯“能发能收”。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 明确消息契约、序列化格式、幂等键和失败补偿机制。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 把消息队列当同步 RPC 替代品。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续补齐顺序性、事务消息、Outbox、事件溯源和流式治理。
适用场景
- 当你准备把《MassTransit 消息总线》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《MassTransit 消息总线》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《MassTransit 消息总线》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《MassTransit 消息总线》最大的收益和代价分别是什么?
