整合 RabbitMQ 消息队列
大约 10 分钟约 2944 字
整合 RabbitMQ 消息队列
简介
RabbitMQ 是最流行的开源消息代理,支持多种消息模式(工作队列、发布订阅、路由、RPC)。ASP.NET Core 通过 RabbitMQ.Client 或 CAP 框架集成 RabbitMQ,实现应用解耦、异步处理、流量削峰。掌握交换机、队列、路由和死信机制,可以构建可靠的消息驱动架构。
特点
基础连接
连接管理
/// <summary>
/// RabbitMQ 连接管理
/// </summary>
public class RabbitMQConnection : IDisposable
{
private readonly IConnection _connection;
private readonly ILogger<RabbitMQConnection> _logger;
public RabbitMQConnection(IConfiguration config, ILogger<RabbitMQConnection> logger)
{
_logger = logger;
var factory = new ConnectionFactory
{
HostName = config["RabbitMQ:Host"] ?? "localhost",
Port = int.Parse(config["RabbitMQ:Port"] ?? "5672"),
UserName = config["RabbitMQ:User"] ?? "guest",
Password = config["RabbitMQ:Pass"] ?? "guest",
VirtualHost = config["RabbitMQ:VHost"] ?? "/",
DispatchConsumersAsync = true
};
_connection = factory.CreateConnection();
_logger.LogInformation("RabbitMQ 连接已建立");
}
public IModel CreateChannel() => _connection.CreateModel();
public void Dispose()
{
_connection?.Dispose();
}
}
// 注册
builder.Services.AddSingleton<RabbitMQConnection>();工作队列模式
生产者与消费者
/// <summary>
/// 工作队列 — 点对点消息
/// </summary>
public class OrderMessageService
{
private readonly RabbitMQConnection _connection;
public OrderMessageService(RabbitMQConnection connection)
{
_connection = connection;
}
// 生产者 — 发送订单消息
public void PublishOrder(Order order)
{
using var channel = _connection.CreateChannel();
// 声明队列(幂等操作)
channel.QueueDeclare(
queue: "order.created",
durable: true, // 持久化
exclusive: false,
autoDelete: false,
arguments: null);
var body = JsonSerializer.SerializeToUtf8Bytes(order);
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 消息持久化
properties.MessageId = Guid.NewGuid().ToString();
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
channel.BasicPublish(
exchange: "",
routingKey: "order.created",
basicProperties: properties,
body: body);
}
}
// 消费者 — 处理订单消息
public class OrderCreatedConsumer : BackgroundService
{
private readonly RabbitMQConnection _connection;
private readonly ILogger<OrderCreatedConsumer> _logger;
public OrderCreatedConsumer(RabbitMQConnection connection, ILogger<OrderCreatedConsumer> logger)
{
_connection = connection;
_logger = logger;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var channel = _connection.CreateChannel();
channel.QueueDeclare("order.created", durable: true, exclusive: false, autoDelete: false);
channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
try
{
var order = JsonSerializer.Deserialize<Order>(ea.Body.Span);
await ProcessOrderAsync(order!);
channel.BasicAck(ea.DeliveryTag, multiple: false);
_logger.LogInformation("订单处理完成:{OrderId}", order!.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "订单处理失败");
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: true);
}
};
channel.BasicConsume("order.created", autoAck: false, consumer);
return Task.CompletedTask;
}
private async Task ProcessOrderAsync(Order order)
{
// 业务处理:扣减库存、发送邮件等
await Task.Delay(100);
}
}发布订阅模式
Exchange 交换机
/// <summary>
/// 发布订阅 — Fanout Exchange
/// </summary>
public class EventPublisher
{
private readonly IModel _channel;
public EventPublisher(RabbitMQConnection connection)
{
_channel = connection.CreateChannel();
// 声明 Fanout 交换机
_channel.ExchangeDeclare("order.events", ExchangeType.Fanout, durable: true);
}
public void Publish<T>(string routingKey, T message)
{
var body = JsonSerializer.SerializeToUtf8Bytes(message);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
_channel.BasicPublish("order.events", routingKey, properties, body);
}
}
// 多个消费者各自声明队列绑定到交换机
public class NotificationConsumer : BackgroundService
{
private readonly RabbitMQConnection _connection;
public NotificationConsumer(RabbitMQConnection connection)
{
_connection = connection;
}
protected override Task ExecuteAsync(CancellationToken ct)
{
var channel = _connection.CreateChannel();
channel.ExchangeDeclare("order.events", ExchangeType.Fanout, durable: true);
var queueName = channel.QueueDeclare().QueueName; // 临时队列
channel.QueueBind(queueName, "order.events", "");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var message = JsonSerializer.Deserialize<OrderEvent>(ea.Body.Span);
await SendNotificationAsync(message!);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queueName, autoAck: false, consumer);
return Task.CompletedTask;
}
}死信队列
失败消息处理
/// <summary>
/// 死信队列 — 处理消费失败的消息
/// </summary>
public class DeadLetterSetup
{
private readonly IModel _channel;
public DeadLetterSetup(RabbitMQConnection connection)
{
_channel = connection.CreateChannel();
// 死信交换机和队列
_channel.ExchangeDeclare("dlx.exchange", ExchangeType.Direct, durable: true);
_channel.QueueDeclare("dlx.queue", durable: true, exclusive: false, autoDelete: false);
_channel.QueueBind("dlx.queue", "dlx.exchange", "dlx.routing");
// 业务队列 — 绑定死信
var arguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dlx.exchange" },
{ "x-dead-letter-routing-key", "dlx.routing" },
{ "x-message-ttl", 3600000 } // 消息 TTL 1小时
};
_channel.QueueDeclare("order.process",
durable: true, exclusive: false, autoDelete: false, arguments: arguments);
}
// 重试策略:失败消息重新入队
public void RetryWithBackoff(BasicDeliverEventArgs ea, IModel channel, int maxRetries = 3)
{
var headers = ea.BasicProperties.Headers ?? new Dictionary<string, object>();
var retryCount = headers.ContainsKey("x-retry-count")
? (int)headers["x-retry-count"] : 0;
if (retryCount < maxRetries)
{
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>(headers)
{
["x-retry-count"] = retryCount + 1
};
channel.BasicPublish("", "order.process", properties, ea.Body.ToArray());
channel.BasicAck(ea.DeliveryTag, false);
}
else
{
// 超过重试次数,进入死信
channel.BasicReject(ea.DeliveryTag, false);
}
}
}主题路由模式
Topic Exchange 精确匹配
/// <summary>
/// Topic Exchange — 主题路由(通配符匹配)
/// </summary>
public class TopicExchangeSetup
{
private readonly IModel _channel;
public TopicExchangeSetup(RabbitMQConnection connection)
{
_channel = connection.CreateChannel();
// 声明 Topic 交换机
_channel.ExchangeDeclare("order.topic", ExchangeType.Topic, durable: true);
}
// 生产者 — 发布带主题的消息
public void PublishOrderEvent(string routingKey, OrderEvent orderEvent)
{
// routingKey 格式: <业务>.<操作>.<区域>
// 例如: order.created.cn, order.updated.us, payment.completed.cn
var body = JsonSerializer.SerializeToUtf8Bytes(orderEvent);
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = Guid.NewGuid().ToString();
properties.ContentType = "application/json";
_channel.BasicPublish("order.topic", routingKey, properties, body);
}
}
// 消费者 — 订阅特定主题
public class OrderEventConsumer : BackgroundService
{
private readonly RabbitMQConnection _connection;
public OrderEventConsumer(RabbitMQConnection connection)
{
_connection = connection;
}
protected override Task ExecuteAsync(CancellationToken ct)
{
var channel = _connection.CreateChannel();
channel.ExchangeDeclare("order.topic", ExchangeType.Topic, durable: true);
// 队列 1:只接收中国区订单创建事件
channel.QueueDeclare("order.created.cn.queue", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("order.created.cn.queue", "order.topic", "order.created.cn");
// 队列 2:接收所有订单事件(通配符 * 匹配一个词)
channel.QueueDeclare("order.all.queue", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("order.all.queue", "order.topic", "order.*.*");
// 队列 3:接收所有业务事件(通配符 # 匹配零个或多个词)
channel.QueueDeclare("all.events.queue", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("all.events.queue", "order.topic", "#.created.#");
return Task.CompletedTask;
}
}Direct 路由模式
精确路由
/// <summary>
/// Direct Exchange — 精确路由匹配
/// </summary>
public class DirectExchangeSetup
{
private readonly IModel _channel;
public DirectExchangeSetup(RabbitMQConnection connection)
{
_channel = connection.CreateChannel();
// 声明 Direct 交换机
_channel.ExchangeDeclare("log.exchange", ExchangeType.Direct, durable: true);
// 不同日志级别的队列
_channel.QueueDeclare("log.error", durable: true, exclusive: false, autoDelete: false);
_channel.QueueDeclare("log.warning", durable: true, exclusive: false, autoDelete: false);
_channel.QueueDeclare("log.info", durable: true, exclusive: false, autoDelete: false);
// 绑定路由键
_channel.QueueBind("log.error", "log.exchange", "error");
_channel.QueueBind("log.warning", "log.exchange", "warning");
_channel.QueueBind("log.info", "log.exchange", "info");
}
// 发送日志
public void SendLog(string level, string message)
{
var body = JsonSerializer.SerializeToUtf8Bytes(new { Level = level, Message = message, Time = DateTime.UtcNow });
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
_channel.BasicPublish("log.exchange", level, properties, body);
}
}消息幂等性
重复消费防护
/// <summary>
/// 消息幂等处理 — 防止重复消费
/// </summary>
public class IdempotentConsumer : BackgroundService
{
private readonly RabbitMQConnection _connection;
private readonly IDistributedCache _cache;
private readonly ILogger<IdempotentConsumer> _logger;
private readonly TimeSpan _idempotencyWindow = TimeSpan.FromHours(24);
public IdempotentConsumer(
RabbitMQConnection connection,
IDistributedCache cache,
ILogger<IdempotentConsumer> logger)
{
_connection = connection;
_cache = cache;
_logger = logger;
}
protected override Task ExecuteAsync(CancellationToken ct)
{
var channel = _connection.CreateChannel();
channel.QueueDeclare("order.process", durable: true, exclusive: false, autoDelete: false);
channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
var messageId = ea.BasicProperties.MessageId;
if (string.IsNullOrEmpty(messageId))
{
// 没有 MessageId,生成幂等键
messageId = ComputeIdempotencyKey(ea);
}
// 检查是否已处理过
var processed = await _cache.GetStringAsync($"msg:{messageId}");
if (processed == "1")
{
_logger.LogWarning("重复消息,跳过: {MessageId}", messageId);
channel.BasicAck(ea.DeliveryTag, multiple: false);
return;
}
try
{
await ProcessOrderAsync(ea);
// 标记为已处理
await _cache.SetStringAsync($"msg:{messageId}", "1",
new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = _idempotencyWindow });
channel.BasicAck(ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
_logger.LogError(ex, "消息处理失败: {MessageId}", messageId);
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: true);
}
};
channel.BasicConsume("order.process", autoAck: false, consumer);
return Task.CompletedTask;
}
private static string ComputeIdempotencyKey(BasicDeliverEventArgs ea)
{
// 基于消息内容生成哈希作为幂等键
var content = Convert.ToBase64String(ea.Body.ToArray());
return Convert.ToHexString(System.Security.Cryptography.SHA256.HashData(
System.Text.Encoding.UTF8.GetBytes(content)))[..16];
}
private async Task ProcessOrderAsync(BasicDeliverEventArgs ea) { }
}消息序列化与契约
消息格式最佳实践
/// <summary>
/// 消息契约设计
/// </summary>
// 1. 消息体使用不可变类型
public record OrderCreatedMessage(
Guid OrderId,
long UserId,
decimal TotalAmount,
IReadOnlyList<OrderItemMessage> Items,
DateTimeOffset CreatedAt);
public record OrderItemMessage(
int ProductId,
string ProductName,
int Quantity,
decimal UnitPrice);
// 2. 消息信封(Envelope 模式)— 携带元数据
public record MessageEnvelope<T>(
string MessageId,
string MessageType,
DateTimeOffset Timestamp,
int Version,
T Payload);
// 3. 序列化配置
public class MessageSerializer
{
private static readonly JsonSerializerOptions Options = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};
public static byte[] Serialize<T>(T message)
{
return JsonSerializer.SerializeToUtf8Bytes(message, Options);
}
public static T? Deserialize<T>(ReadOnlySpan<byte> data)
{
return JsonSerializer.Deserialize<T>(data, Options);
}
}
// 4. 消息版本兼容
public class MessageVersionHandler
{
// 旧版本消息 → 新版本消息的转换
public OrderCreatedMessage V1ToV2(OrderCreatedMessageV1 v1)
{
return new OrderCreatedMessage(
Guid.Parse(v1.OrderId),
v1.UserId,
v1.TotalAmount,
v1.Items.Select(i => new OrderItemMessage(i.ProductId, i.Name, i.Qty, i.Price)).ToList(),
DateTimeOffset.UtcNow);
}
}
public record OrderCreatedMessageV1(
string OrderId,
long UserId,
decimal TotalAmount,
List<V1Item> Items);
public record V1Item(int ProductId, string Name, int Qty, decimal Price);连接池与性能优化
Channel 复用策略
/// <summary>
/// RabbitMQ Channel 池化管理
/// </summary>
public class RabbitMQChannelPool : IDisposable
{
private readonly RabbitMQConnection _connection;
private readonly Channel<IModel, IModel> _pool;
private readonly ILogger<RabbitMQChannelPool> _logger;
public RabbitMQChannelPool(
RabbitMQConnection connection,
ILogger<RabbitMQChannelPool> logger,
int poolSize = 10)
{
_connection = connection;
_logger = logger;
_pool = Channel.CreateBounded<IModel>(new BoundedChannelOptions(poolSize)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
// 预创建 Channel
for (int i = 0; i < poolSize; i++)
{
_pool.Writer.TryWrite(connection.CreateChannel());
}
}
public async Task<IModel> GetChannelAsync()
{
var channel = await _pool.Reader.ReadAsync();
// 检查 Channel 是否仍然可用
if (!channel.IsOpen)
{
_logger.LogWarning("Channel 已关闭,创建新 Channel");
return _connection.CreateChannel();
}
return channel;
}
public async Task ReturnChannelAsync(IModel channel)
{
if (channel.IsOpen)
{
await _pool.Writer.WriteAsync(channel);
}
else
{
channel.Dispose();
}
}
public void Dispose()
{
while (_pool.Reader.TryRead(out var channel))
{
channel.Dispose();
}
}
}
// 注册
builder.Services.AddSingleton<RabbitMQChannelPool>();监控与告警
消息队列健康指标
/// <summary>
/// RabbitMQ 监控服务
/// </summary>
public class RabbitMQHealthCheck : IHealthCheck
{
private readonly RabbitMQConnection _connection;
public RabbitMQHealthCheck(RabbitMQConnection connection)
{
_connection = connection;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
try
{
using var channel = _connection.CreateChannel();
// 尝试声明临时队列验证连接
channel.QueueDeclarePassive("health-check-probe");
return HealthCheckResult.Healthy("RabbitMQ 连接正常");
}
catch (Exception ex)
{
return HealthCheckResult.Degraded($"RabbitMQ 连接异常: {ex.Message}");
}
}
}
// 注册健康检查
builder.Services.AddHealthChecks()
.AddCheck<RabbitMQHealthCheck>("rabbitmq", tags: new[] { "infrastructure" });优点
缺点
总结
RabbitMQ 选择模式:点对点用默认交换机(工作队列),广播用 Fanout,路由用 Direct,主题匹配用 Topic。消息可靠性三要素:持久化队列 + 持久化消息 + 手动 ACK。失败处理用死信队列 + 重试策略。推荐使用 CAP 框架简化消息事务管理。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 消息系统主题的核心是削峰、解耦和异步一致性,而不是单纯“能发能收”。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 明确消息契约、序列化格式、幂等键和失败补偿机制。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 把消息队列当同步 RPC 替代品。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续补齐顺序性、事务消息、Outbox、事件溯源和流式治理。
适用场景
- 当你准备把《整合 RabbitMQ 消息队列》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《整合 RabbitMQ 消息队列》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《整合 RabbitMQ 消息队列》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《整合 RabbitMQ 消息队列》最大的收益和代价分别是什么?
