消息队列可靠性保障
大约 15 分钟约 4644 字
消息队列可靠性保障
简介
在分布式系统中,消息队列是解耦服务间通信的核心基础设施。然而,"发一条消息"看似简单,要在网络抖动、服务宕机、流量洪峰等各种异常情况下保证消息不丢失、不重复、不乱序,却是一个极具挑战性的工程问题。消息可靠性涉及生产者、消费者、消息中间件三个环节,每一个环节都可能成为消息丢失或重复的突破口。
本文将系统讲解在 ASP.NET Core 环境下,如何基于 RabbitMQ 和 Kafka 构建高可靠的消息系统,涵盖从理论模型到生产级实现的所有关键环节。
特点
- 端到端可靠性:覆盖从生产到消费的全链路保障
- Exactly-Once 语义:在业务层面实现精确一次处理
- 可观测性:全链路消息追踪与告警
- 可恢复性:通过死信队列和重试机制实现故障自愈
- 有序性保障:在分区/队列级别保证消息顺序
核心概念与实现
一、可靠性模型:三个环节
生产者 -> [消息队列] -> 消费者
| | |
|---确认机制--|---持久化---|---手动ACK--|
|---重试机制--|---镜像队列--|---幂等消费--|
|---事务/本地表|---副本机制--|---死信队列--|消息可能在三个环节丢失:
- 生产阶段:消息发送到队列之前
- 存储阶段:消息在队列中的持久化
- 消费阶段:消费者处理消息时
二、RabbitMQ 可靠生产者
// ============ RabbitMQ 连接配置 ============
// 安装 NuGet: RabbitMQ.Client
builder.Services.AddSingleton<IConnection>(sp =>
{
var config = sp.GetRequiredService<IConfiguration>();
var factory = new ConnectionFactory
{
HostName = config["RabbitMQ:Host"],
Port = config.GetValue("RabbitMQ:Port", 5672),
UserName = config["RabbitMQ:Username"],
Password = config["RabbitMQ:Password"],
VirtualHost = config["RabbitMQ:VHost"] ?? "/",
// 连接自动恢复
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(5),
RequestedHeartbeat = TimeSpan.FromSeconds(60),
DispatchConsumersAsync = true // 启用异步消费者
};
return factory.CreateConnection();
});
// 注册 Channel(每个操作一个 Channel)
builder.Services.AddScoped<ISubscription>(sp =>
{
var connection = sp.GetRequiredService<IConnection>();
var channel = connection.CreateModel();
return new RabbitMQSubscription(channel);
});// ============ 可靠消息发送服务 ============
public class ReliableRabbitMQPublisher
{
private readonly IConnection _connection;
private readonly ILogger<ReliableRabbitMQPublisher> _logger;
private readonly AppDbContext _dbContext;
public ReliableRabbitMQPublisher(
IConnection connection,
ILogger<ReliableRabbitMQPublisher> logger,
AppDbContext dbContext)
{
_connection = connection;
_logger = logger;
_dbContext = dbContext;
}
/// <summary>
/// Publisher Confirm 模式:确保消息到达 Broker
/// </summary>
public async Task<bool> PublishWithConfirmAsync<T>(
T message,
string exchange,
string routingKey,
CancellationToken ct = default)
{
using var channel = _connection.CreateModel();
// 开启 Publisher Confirms
channel.ConfirmSelect();
var body = JsonSerializer.SerializeToUtf8Bytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 消息持久化
properties.MessageId = Guid.NewGuid().ToString(); // 消息唯一ID
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
properties.ContentType = "application/json";
properties.DeliveryMode = 2; // 持久化
channel.BasicPublish(
exchange: exchange,
routingKey: routingKey,
basicProperties: properties,
body: body);
// 等待 Broker 确认(超时 5 秒)
try
{
var confirmed = channel.WaitForConfirms(TimeSpan.FromSeconds(5));
if (confirmed)
{
_logger.LogInformation("消息已确认: Exchange={Ex}, RoutingKey={Key}, MsgId={Id}",
exchange, routingKey, properties.MessageId);
return true;
}
else
{
_logger.LogWarning("消息确认失败(NACK): Exchange={Ex}, RoutingKey={Key}",
exchange, routingKey);
return false;
}
}
catch (TimeoutException)
{
_logger.LogError("等待消息确认超时");
return false;
}
}
}三、Transactional Outbox 模式
// ============ Outbox 表:确保消息不丢失 ============
public class OutboxMessage
{
public Guid Id { get; set; }
public string Exchange { get; set; } = string.Empty;
public string RoutingKey { get; set; } = string.Empty;
public string Payload { get; set; } = string.Empty;
public string MessageType { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime? SentAt { get; set; }
public int RetryCount { get; set; }
public string? LastError { get; set; }
public OutboxStatus Status { get; set; } = OutboxStatus.Pending;
}
public enum OutboxStatus
{
Pending,
Sent,
Failed
}
// ============ Outbox 发送器 ============
public class OutboxService
{
private readonly AppDbContext _dbContext;
private readonly IConnection _connection;
private readonly ILogger<OutboxService> _logger;
public OutboxService(
AppDbContext dbContext,
IConnection connection,
ILogger<OutboxService> logger)
{
_dbContext = dbContext;
_connection = connection;
_logger = logger;
}
/// <summary>
/// 在同一个事务中写入业务数据和 Outbox 消息
/// </summary>
public async Task SaveWithOutboxAsync<TEvent>(
IEnumerable<object> entities,
TEvent @event,
string exchange,
string routingKey,
CancellationToken ct = default) where TEvent : class
{
// 创建 Outbox 消息
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
Exchange = exchange,
RoutingKey = routingKey,
Payload = JsonSerializer.Serialize(@event),
MessageType = typeof(TEvent).Name,
CreatedAt = DateTime.UtcNow,
Status = OutboxStatus.Pending
};
// 在同一个 DbContext 中添加业务实体和 Outbox 消息
foreach (var entity in entities)
{
_dbContext.Add(entity);
}
_dbContext.OutboxMessages.Add(outboxMessage);
// 一次性提交事务
await _dbContext.SaveChangesAsync(ct);
}
/// <summary>
/// 后台服务:定期扫描并发送未发送的 Outbox 消息
/// </summary>
public async Task ProcessOutboxAsync(CancellationToken ct = default)
{
var pendingMessages = await _dbContext.OutboxMessages
.Where(m => m.Status == OutboxStatus.Pending && m.RetryCount < 5)
.OrderBy(m => m.CreatedAt)
.Take(100)
.ToListAsync(ct);
foreach (var message in pendingMessages)
{
try
{
using var channel = _connection.CreateModel();
channel.ConfirmSelect();
var body = Encoding.UTF8.GetBytes(message.Payload);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = message.Id.ToString();
properties.ContentType = "application/json";
channel.BasicPublish(
exchange: message.Exchange,
routingKey: message.RoutingKey,
basicProperties: properties,
body: body);
channel.WaitForConfirms(TimeSpan.FromSeconds(5));
message.Status = OutboxStatus.Sent;
message.SentAt = DateTime.UtcNow;
}
catch (Exception ex)
{
message.RetryCount++;
message.LastError = ex.Message;
_logger.LogError(ex, "Outbox 消息发送失败: {Id}", message.Id);
}
}
await _dbContext.SaveChangesAsync(ct);
}
}
// ============ Outbox 后台处理服务 ============
public class OutboxProcessor : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OutboxProcessor> _logger;
public OutboxProcessor(IServiceProvider serviceProvider, ILogger<OutboxProcessor> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _serviceProvider.CreateScope();
var outboxService = scope.ServiceProvider.GetRequiredService<OutboxService>();
await outboxService.ProcessOutboxAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox 处理异常");
}
await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
}
}
}四、幂等消费者
// ============ 消费者幂等性实现 ============
public class IdempotentConsumer
{
private readonly AppDbContext _dbContext;
private readonly ILogger<IdempotentConsumer> _logger;
private readonly IMessageHandlerFactory _handlerFactory;
public IdempotentConsumer(
AppDbContext dbContext,
ILogger<IdempotentConsumer> logger,
IMessageHandlerFactory handlerFactory)
{
_dbContext = dbContext;
_logger = logger;
_handlerFactory = handlerFactory;
}
/// <summary>
/// 幂等消费:基于消息唯一ID去重
/// </summary>
public async Task ConsumeAsync<TMessage>(
TMessage message,
string messageId,
CancellationToken ct = default) where TMessage : class
{
// 检查消息是否已处理
var processed = await _dbContext.ProcessedMessages
.AnyAsync(m => m.MessageId == messageId, ct);
if (processed)
{
_logger.LogInformation("消息已处理,跳过: {MsgId}", messageId);
return;
}
// 开启事务
await using var transaction = await _dbContext.Database.BeginTransactionAsync(ct);
try
{
// 执行业务处理
var handler = _handlerFactory.GetHandler<TMessage>();
await handler.HandleAsync(message, ct);
// 记录已处理的消息
_dbContext.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = messageId,
MessageType = typeof(TMessage).Name,
ProcessedAt = DateTime.UtcNow,
Payload = JsonSerializer.Serialize(message)
});
await _dbContext.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
}
catch (Exception ex)
{
await transaction.RollbackAsync(ct);
_logger.LogError(ex, "消息处理失败: {MsgId}", messageId);
throw; // 重新抛出,让 RabbitMQ 重新投递
}
}
}
// ============ ProcessedMessage 表 ============
public class ProcessedMessage
{
public string MessageId { get; set; } = string.Empty;
public string MessageType { get; set; } = string.Empty;
public DateTime ProcessedAt { get; set; }
public string Payload { get; set; } = string.Empty;
}五、RabbitMQ 消费者实现(带手动 ACK 和死信)
// ============ 队列声明(带死信配置) ============
public class QueueDeclarationService
{
private readonly IConnection _connection;
public QueueDeclarationService(IConnection connection)
{
_connection = connection;
}
public void DeclareOrderQueue()
{
using var channel = _connection.CreateModel();
// 死信交换机
channel.ExchangeDeclare("dlx.exchange", ExchangeType.Direct, durable: true);
channel.QueueDeclare("orders.dead", durable: true, exclusive: false, autoDelete: false);
channel.QueueBind("orders.dead", "dlx.exchange", "orders.dead");
// 主队列:配置死信路由
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dlx.exchange" },
{ "x-dead-letter-routing-key", "orders.dead" },
{ "x-message-ttl", 86400000 }, // 消息TTL: 24小时
{ "x-max-length", 100000 }, // 最大队列长度
{ "x-queue-type", "quorum" } // 仲裁队列(RabbitMQ 3.9+)
};
channel.ExchangeDeclare("orders.exchange", ExchangeType.Topic, durable: true);
channel.QueueDeclare("orders.queue", durable: true, exclusive: false,
autoDelete: false, arguments: args);
channel.QueueBind("orders.queue", "orders.exchange", "order.#");
}
}
// ============ 可靠消费者服务 ============
public class ReliableRabbitMQConsumer : BackgroundService
{
private readonly IConnection _connection;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<ReliableRabbitMQConsumer> _logger;
private IModel? _channel;
public ReliableRabbitMQConsumer(
IConnection connection,
IServiceProvider serviceProvider,
ILogger<ReliableRabbitMQConsumer> logger)
{
_connection = connection;
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_channel = _connection.CreateModel();
// Prefetch count:一次最多预取多少条消息
_channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var messageId = ea.BasicProperties.MessageId;
var routingKey = ea.RoutingKey;
try
{
var body = ea.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
using var scope = _serviceProvider.CreateScope();
var idempotentConsumer = scope.ServiceProvider
.GetRequiredService<IdempotentConsumer>();
// 根据路由键反序列化为不同类型
if (routingKey.StartsWith("order.created"))
{
var message = JsonSerializer.Deserialize<OrderCreatedEvent>(json);
if (message != null)
await idempotentConsumer.ConsumeAsync(message, messageId!, stoppingToken);
}
else if (routingKey.StartsWith("order.cancelled"))
{
var message = JsonSerializer.Deserialize<OrderCancelledEvent>(json);
if (message != null)
await idempotentConsumer.ConsumeAsync(message, messageId!, stoppingToken);
}
// 手动确认
_channel.BasicAck(ea.DeliveryTag, multiple: false);
_logger.LogInformation("消息已处理: {MsgId}", messageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "消息处理失败: {MsgId}", messageId);
// 判断是否需要重试
var retryCount = GetRetryCount(ea);
if (retryCount < 3)
{
// 拒绝并重新入队
_channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: true);
_logger.LogWarning("消息重新入队,重试次数: {Count}", retryCount);
}
else
{
// 超过重试次数,进入死信队列
_channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
_logger.LogError("消息进入死信队列: {MsgId}", messageId);
}
}
};
_channel.BasicConsume(queue: "orders.queue", autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
private static int GetRetryCount(BasicDeliverEventArgs ea)
{
if (ea.BasicProperties.Headers != null &&
ea.BasicProperties.Headers.TryGetValue("x-death", out var xDeath))
{
var deaths = (List<object>)xDeath;
if (deaths.Count > 0)
{
var death = (Dictionary<string, object>)deaths[0];
if (death.TryGetValue("count", out var count))
return Convert.ToInt32(count);
}
}
return 0;
}
public override void Dispose()
{
_channel?.Dispose();
base.Dispose();
}
}六、延迟消息
// ============ 延迟队列实现(TTL + 死信) ============
public class DelayedMessageService
{
private readonly IConnection _connection;
private readonly ILogger<DelayedMessageService> _logger;
public DelayedMessageService(IConnection connection, ILogger<DelayedMessageService> logger)
{
_connection = connection;
_logger = logger;
}
/// <summary>
/// 发送延迟消息
/// </summary>
public void SendDelayed<T>(T message, string routingKey, TimeSpan delay)
{
using var channel = _connection.CreateModel();
// 延迟队列(消息过期后转发到目标交换机)
var delayQueueName = $"delay.{delay.TotalSeconds}s";
var args = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "orders.exchange" },
{ "x-dead-letter-routing-key", routingKey },
{ "x-message-ttl", (int)delay.TotalMilliseconds }
};
channel.QueueDeclare(delayQueueName, durable: true, exclusive: false,
autoDelete: false, arguments: args);
// 发送消息到延迟队列
var body = JsonSerializer.SerializeToUtf8Bytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = Guid.NewGuid().ToString();
channel.BasicPublish(
exchange: "",
routingKey: delayQueueName,
basicProperties: properties,
body: body);
_logger.LogInformation("延迟消息已发送: 延迟 {Seconds}s, 路由 {Key}",
delay.TotalSeconds, routingKey);
}
/// <summary>
/// 取消订单的延迟检查(30分钟后检查订单是否支付)
/// </summary>
public void ScheduleOrderTimeoutCheck(string orderId)
{
SendDelayed(
new OrderTimeoutCheckEvent { OrderId = orderId },
"order.timeout.check",
TimeSpan.FromMinutes(30));
}
}七、Kafka 可靠性配置
// ============ Kafka 生产者配置 ============
// 安装 NuGet: Confluent.Kafka
builder.Services.AddSingleton<IProducer<string, string>>(sp =>
{
var config = sp.GetRequiredService<IConfiguration>();
var producerConfig = new ProducerConfig
{
BootstrapServers = config["Kafka:BootstrapServers"],
// 确认机制:等待所有副本确认(最强可靠性)
Acks = Acks.All,
// 重试配置
MessageSendMaxRetries = 3,
RetryBackoffMs = 1000,
// 幂等生产者
EnableIdempotence = true,
// 批量发送
BatchSize = 16384,
LingerMs = 5,
// 缓冲区
QueueBufferingMaxMessages = 100000,
QueueBufferingMaxKbytes = 1048576,
// 请求超时
RequestTimeoutMs = 30000,
MessageTimeoutMs = 60000,
// 压缩
CompressionType = CompressionType.Lz4
};
return new ProducerBuilder<string, string>(producerConfig).Build();
});
// ============ Kafka 可靠消费者配置 ============
builder.Services.AddSingleton<IConsumer<string, string>>(sp =>
{
var config = sp.GetRequiredService<IConfiguration>();
var consumerConfig = new ConsumerConfig
{
BootstrapServers = config["Kafka:BootstrapServers"],
GroupId = config["Kafka:GroupId"] ?? "myapp-consumer-group",
// 从最早开始消费(避免丢失消息)
AutoOffsetReset = AutoOffsetReset.Earliest,
// 关闭自动提交
EnableAutoCommit = false,
// 最大轮询间隔(心跳超时)
MaxPollIntervalMs = 300000,
SessionTimeoutMs = 30000,
HeartbeatIntervalMs = 10000,
// 每次拉取的最大消息数
MaxPartitionFetchBytes = 1048576
};
return new ConsumerBuilder<string, string>(consumerConfig).Build();
});// ============ Kafka 消费者服务 ============
public class KafkaConsumerService : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<KafkaConsumerService> _logger;
private readonly KafkaConsumerOptions _options;
public KafkaConsumerService(
IConsumer<string, string> consumer,
IServiceProvider serviceProvider,
ILogger<KafkaConsumerService> logger,
KafkaConsumerOptions options)
{
_consumer = consumer;
_serviceProvider = serviceProvider;
_logger = logger;
_options = options;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe(_options.Topics);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = _consumer.Consume(stoppingToken);
using var scope = _serviceProvider.CreateScope();
var idempotentConsumer = scope.ServiceProvider
.GetRequiredService<IdempotentConsumer>();
var topic = consumeResult.Topic;
var key = consumeResult.Message.Key;
var value = consumeResult.Message.Value;
var offset = consumeResult.Offset;
var partition = consumeResult.Partition;
_logger.LogDebug("消费消息: Topic={Topic}, Key={Key}, Partition={Part}, Offset={Offset}",
topic, key, partition, offset);
// 根据主题路由到不同的处理器
var messageType = _options.TopicMessageTypeMap.GetValueOrDefault(topic);
if (messageType != null)
{
var message = JsonSerializer.Deserialize(value, messageType);
if (message != null)
{
// 使用 partition+offset 作为幂等键
var messageId = $"{topic}:{partition.Value}:{offset.Value}";
await idempotentConsumer.ConsumeAsync(
(dynamic)message, messageId, stoppingToken);
}
}
// 手动提交偏移量
_consumer.Commit(consumeResult);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Kafka 消费异常: {Reason}", ex.Error.Reason);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "消息处理异常");
// 不提交偏移量,下次重新消费
}
}
_consumer.Close();
}
}八、消息去重策略
// ============ 多层去重方案 ============
public class MessageDeduplicationService
{
private readonly IDistributedCache _cache;
private readonly AppDbContext _dbContext;
private readonly ILogger<MessageDeduplicationService> _logger;
public MessageDeduplicationService(
IDistributedCache cache,
AppDbContext dbContext,
ILogger<MessageDeduplicationService> logger)
{
_cache = cache;
_dbContext = dbContext;
_logger = logger;
}
/// <summary>
/// 双层去重:Redis 快速检查 + 数据库持久化记录
/// </summary>
public async Task<bool> IsDuplicateAsync(string messageId, CancellationToken ct = default)
{
// 第一层:Redis 布隆过滤器(快速判断)
var cacheKey = $"dedup:{messageId}";
var cached = await _cache.GetStringAsync(cacheKey, ct);
if (cached != null)
{
_logger.LogDebug("Redis 去重命中: {MsgId}", messageId);
return true;
}
// 第二层:数据库精确查询
var exists = await _dbContext.ProcessedMessages
.AnyAsync(m => m.MessageId == messageId, ct);
if (exists)
{
// 回写 Redis 缓存
await _cache.SetStringAsync(cacheKey, "1", new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24)
}, ct);
return true;
}
return false;
}
/// <summary>
/// 标记消息为已处理
/// </summary>
public async Task MarkAsProcessedAsync(string messageId, string payload,
CancellationToken ct = default)
{
// 写入数据库
_dbContext.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = messageId,
ProcessedAt = DateTime.UtcNow,
Payload = payload
});
await _dbContext.SaveChangesAsync(ct);
// 写入 Redis
await _cache.SetStringAsync(
$"dedup:{messageId}", "1",
new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = TimeSpan.FromHours(24)
}, ct);
}
}九、消息追踪
// ============ 全链路消息追踪 ============
public class MessageTracingMiddleware
{
private readonly ILogger<MessageTracingMiddleware> _logger;
private readonly IMessageTraceRepository _traceRepository;
public async Task TraceAsync<TMessage>(
TMessage message,
string messageId,
string source,
string destination,
Func<Task> processAction)
{
var traceId = Activity.Current?.Id ?? Guid.NewGuid().ToString();
var stopwatch = Stopwatch.StartNew();
var trace = new MessageTrace
{
TraceId = traceId,
MessageId = messageId,
MessageType = typeof(TMessage).Name,
Source = source,
Destination = destination,
Status = "Processing",
StartedAt = DateTime.UtcNow
};
await _traceRepository.SaveAsync(trace);
try
{
await processAction();
stopwatch.Stop();
trace.Status = "Completed";
trace.Duration = stopwatch.ElapsedMilliseconds;
trace.CompletedAt = DateTime.UtcNow;
_logger.LogInformation("消息处理完成: {MsgId}, 耗时 {Ms}ms", messageId, stopwatch.ElapsedMilliseconds);
}
catch (Exception ex)
{
stopwatch.Stop();
trace.Status = "Failed";
trace.Error = ex.Message;
trace.Duration = stopwatch.ElapsedMilliseconds;
_logger.LogError(ex, "消息处理失败: {MsgId}", messageId);
throw;
}
finally
{
await _traceRepository.UpdateAsync(trace);
}
}
}
public class MessageTrace
{
public string TraceId { get; set; } = string.Empty;
public string MessageId { get; set; } = string.Empty;
public string MessageType { get; set; } = string.Empty;
public string Source { get; set; } = string.Empty;
public string Destination { get; set; } = string.Empty;
public string Status { get; set; } = string.Empty;
public string? Error { get; set; }
public long Duration { get; set; }
public DateTime StartedAt { get; set; }
public DateTime? CompletedAt { get; set; }
}十、监控与告警
// ============ 消息队列健康检查 ============
public class RabbitMQHealthCheck : IHealthCheck
{
private readonly IConnection _connection;
public RabbitMQHealthCheck(IConnection connection)
{
_connection = connection;
}
public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context, CancellationToken ct = default)
{
if (_connection.IsOpen)
{
return Task.FromResult(HealthCheckResult.Healthy("RabbitMQ 连接正常"));
}
return Task.FromResult(HealthCheckResult.Unhealthy("RabbitMQ 连接已断开"));
}
}
// ============ 队列积压监控 ============
public class QueueMonitorService : BackgroundService
{
private readonly IConnection _connection;
private readonly ILogger<QueueMonitorService> _logger;
private readonly IMetricsCollector _metrics;
public QueueMonitorService(
IConnection connection,
ILogger<QueueMonitorService> logger,
IMetricsCollector metrics)
{
_connection = connection;
_logger = logger;
_metrics = metrics;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
using var channel = _connection.CreateModel();
var queues = new[] { "orders.queue", "orders.dead", "notifications.queue" };
foreach (var queue in queues)
{
var result = channel.QueueDeclarePassive(queue);
var messageCount = result.MessageCount;
var consumerCount = result.ConsumerCount;
_metrics.RecordGauge($"mq.queue.messages", messageCount, tags: $"queue={queue}");
_metrics.RecordGauge($"mq.queue.consumers", consumerCount, tags: $"queue={queue}");
// 积压告警
if (messageCount > 1000)
{
_logger.LogWarning("队列 {Queue} 积压 {Count} 条消息,消费者数量: {Consumers}",
queue, messageCount, consumerCount);
}
// 死信队列告警
if (queue.Contains("dead") && messageCount > 0)
{
_logger.LogError("死信队列 {Queue} 有 {Count} 条消息需要处理",
queue, messageCount);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "队列监控异常");
}
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
}十一、Kafka Exactly-Once 实现模式
// ============ Kafka 事务生产者 ============
public class KafkaTransactionalProducer
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaTransactionalProducer> _logger;
public KafkaTransactionalProducer(
IProducer<string, string> producer,
ILogger<KafkaTransactionalProducer> logger)
{
_producer = producer;
_logger = logger;
}
/// <summary>
/// 事务性发送:确保多条消息要么全部成功,要么全部失败
/// </summary>
public async Task SendTransactionalBatchAsync(
IEnumerable<(string Topic, string Key, string Value)> messages,
CancellationToken ct = default)
{
_producer.InitTransactions(TimeSpan.FromSeconds(10));
try
{
_producer.BeginTransaction();
foreach (var (topic, key, value) in messages)
{
await _producer.ProduceAsync(topic, new Message<string, string>
{
Key = key,
Value = value,
Headers = new Headers
{
{ "source", Encoding.UTF8.GetBytes("order-service") },
{ "timestamp", Encoding.UTF8.GetBytes(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString()) }
}
}, ct);
}
_producer.CommitTransaction(TimeSpan.FromSeconds(10));
_logger.LogInformation("事务消息批量提交成功,共 {Count} 条", messages.Count());
}
catch (Exception ex)
{
_producer.AbortTransaction(TimeSpan.FromSeconds(10));
_logger.LogError(ex, "事务消息提交失败,已回滚");
throw;
}
}
}
// ============ Consume-Transform-Produce 模式 ============
// 消费一个 topic,处理后写入另一个 topic,保证 Exactly-Once
public class ConsumeTransformProduceService
{
private readonly IConsumer<string, string> _consumer;
private readonly IProducer<string, string> _producer;
private readonly ILogger _logger;
public async Task ProcessAsync(string inputTopic, string outputTopic, CancellationToken ct)
{
_consumer.Subscribe(inputTopic);
while (!ct.IsCancellationRequested)
{
var result = _consumer.Consume(ct);
try
{
// 开启事务
_producer.InitTransactions(TimeSpan.FromSeconds(10));
_producer.BeginTransaction();
// 变换数据
var transformed = TransformMessage(result.Message.Value);
// 写入输出 topic
await _producer.ProduceAsync(outputTopic, new Message<string, string>
{
Key = result.Message.Key,
Value = transformed
}, ct);
// 将消费偏移量也纳入事务
var groupMetadata = _consumer.ConsumerGroupMetadata;
var offsets = new List<TopicPartitionOffset>
{
result.TopicPartitionOffset
};
_producer.SendOffsetsToTransaction(
offsets, groupMetadata, TimeSpan.FromSeconds(10));
_producer.CommitTransaction(TimeSpan.FromSeconds(10));
}
catch
{
_producer.AbortTransaction(TimeSpan.FromSeconds(10));
}
}
}
private string TransformMessage(string input)
{
// 业务转换逻辑
return input;
}
}优点
- 数据不丢失:Outbox 模式 + Publisher Confirm + 手动 ACK 三重保障
- 精确一次处理:幂等消费者 + 事务消费模式
- 故障自愈:死信队列 + 重试策略 + 延迟重试
- 可追踪:全链路消息 ID 关联,快速定位问题
缺点
- 复杂度显著增加:每个可靠性保障措施都增加了一层抽象
- 性能损耗:事务、确认机制、幂等检查都有性能开销
- 运维成本:需要监控队列积压、死信队列、消费者 Lag 等指标
- 存储成本:Outbox 表和 ProcessedMessage 表会持续增长,需要清理策略
性能注意事项
- RabbitMQ Prefetch:设置合理的
prefetchCount,太小浪费吞吐,太大导致消息积压 - Kafka 批量拉取:适当增大
fetch.min.bytes和fetch.max.wait.ms提升吞吐 - Outbox 表清理:定期清理已发送的 Outbox 记录,避免表过大影响查询
- 连接复用:RabbitMQ 中 Channel 是轻量的,但 Connection 是重的,应复用 Connection
- 消息大小:消息体控制在 1MB 以内,超大消息应使用引用模式(消息体存对象存储)
总结
消息可靠性不是一个单一的技术问题,而是一个系统性工程。在生产端,Transactional Outbox 模式确保业务操作和消息发送的原子性;在存储端,消息持久化和副本机制确保消息不因 Broker 故障而丢失;在消费端,手动 ACK 和幂等消费确保消息被正确处理。三个环节缺一不可,只有同时保障才能实现真正的端到端可靠性。
关键知识点
| 知识点 | 要点 |
|---|---|
| Publisher Confirm | RabbitMQ 生产者确认机制 |
| Transactional Outbox | 业务操作与消息发送在同一事务中 |
| 手动 ACK | 消费者处理完消息后手动确认 |
| 死信队列(DLQ) | 处理失败的消息的最终归宿 |
| 幂等消费 | 基于唯一 ID 去重,确保重复消费不产生副作用 |
| Kafka Exactly-Once | 事务 + 幂等 + 消费偏移量纳入事务 |
| 延迟消息 | TTL + 死信实现定时投递 |
| 消费者组 | Kafka 中同一组内的消费者分摊分区 |
常见误区
误区:开了自动 ACK 就安全了
- 事实:自动 ACK 在消息投递时就确认,消费者处理失败消息会丢失
误区:RabbitMQ 消息持久化就不会丢
- 事实:持久化消息可能还在 Page Cache 中,Broker 宕机时可能丢失
误区:Kafka 副本机制保证 Exactly-Once
- 事实:副本只保证 At-Least-Once,Exactly-Once 需要幂等消费者配合
误区:Outbox 模式能替代消息队列
- 事实:Outbox 解决的是"发消息"的可靠性,不是消息路由和分发
误区:消息顺序在所有场景都能保证
- 事实:顺序只在单一分区/队列内保证,跨分区/队列无序
进阶路线
- 云原生消息:Dapr Pub/Sub 抽象、CloudEvents 规范
- 事件溯源(Event Sourcing):以事件序列作为数据存储的源
- CQRS:命令与查询分离,消息队列作为同步通道
- Schema Registry:Confluent Schema Registry 管理 Kafka 消息的 Schema 演化
- 流处理:Kafka Streams / Flink 实现复杂的事件流处理
适用场景
| 场景 | 推荐方案 |
|---|---|
| 订单处理 | RabbitMQ + Outbox + 幂等消费 |
| 日志收集 | Kafka + 分区 + 批量消费 |
| 通知推送 | RabbitMQ + 延迟消息 + 死信 |
| 数据同步 | Kafka + Connect + Exactly-Once |
| 异步任务 | RabbitMQ + TTL + 死信重试 |
| 事件广播 | Kafka + 消费者组 + 多 Topic |
落地建议
- 第一步:基础设施。部署 RabbitMQ/Kafka 集群,配置持久化和监控
- 第二步:生产者改造。引入 Outbox 模式,替代直接发送消息
- 第三步:消费者改造。关闭自动 ACK,实现手动确认
- 第四步:幂等消费。为每个消费者添加去重检查
- 第五步:死信处理。配置死信队列,建立死信处理流程
- 第六步:监控告警。队列积压、消费延迟、错误率告警
- 第七步:压力测试。模拟高并发场景,验证可靠性
排错清单
复盘问题
- 在过去一个月中,是否有消息丢失的事件?根因是什么?
- 死信队列中积压了多少消息?平均多长时间处理一次?
- Outbox 表中未发送的消息有多少?延迟中位数是多少?
- 消费者的平均处理延迟是多少?P99 呢?
- 如果 RabbitMQ/Kafka 集群宕机 30 分钟,业务会受到什么影响?
