消息队列深入(RabbitMQ/Kafka)
大约 9 分钟约 2677 字
消息队列深入(RabbitMQ/Kafka)
简介
消息队列是微服务异步通信的核心基础设施。RabbitMQ 适合任务队列和点对点通信,Kafka 适合高吞吐的事件流处理。理解两种消息中间件的架构差异、可靠性保证和最佳实践,有助于选择合适的消息方案。
特点
RabbitMQ 深入
交换机与路由
// RabbitMQ 核心概念:
// Exchange(交换机)→ Routing Key → Queue(队列)→ Consumer(消费者)
//
// 交换机类型:
// Direct — 精确匹配路由键
// Fanout — 广播到所有绑定队列
// Topic — 通配符匹配(* 一个词,# 零或多个词)
// Headers — 基于消息头匹配
// 1. 交换机和队列声明
public class RabbitMQTopology
{
private readonly IConnection _connection;
private readonly ILogger<RabbitMQTopology> _logger;
public RabbitMQTopology(IConnection connection, ILogger<RabbitMQTopology> logger)
{
_connection = connection;
_logger = logger;
}
public void DeclareTopology()
{
using var channel = _connection.CreateModel();
// 声明交换机
channel.ExchangeDeclare("order.events", ExchangeType.Topic, durable: true);
channel.ExchangeDeclare("order.dlx", ExchangeType.Direct, durable: true); // 死信交换机
// 声明队列(带死信配置)
channel.QueueDeclare("order.created.inventory", durable: true,
exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object>
{
["x-dead-letter-exchange"] = "order.dlx",
["x-dead-letter-routing-key"] = "order.created.inventory",
["x-message-ttl"] = 3600000, // 1 小时 TTL
["x-max-length"] = 10000,
["x-queue-type"] = "quorum" // 仲裁队列(高可用)
});
channel.QueueDeclare("order.created.notification", durable: true,
exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object>
{
["x-dead-letter-exchange"] = "order.dlx"
});
// 死信队列
channel.QueueDeclare("order.dead-letter", durable: true);
// 绑定
channel.QueueBind("order.created.inventory", "order.events", "order.created.#");
channel.QueueBind("order.created.notification", "order.events", "order.created.#");
channel.QueueBind("order.dead-letter", "order.dlx", "order.created.inventory");
channel.QueueBind("order.dead-letter", "order.dlx", "order.created.notification");
_logger.LogInformation("RabbitMQ 拓扑声明完成");
}
}
// 2. 可靠消息发布
public class ReliableMessagePublisher
{
private readonly IConnection _connection;
private readonly ILogger<ReliableMessagePublisher> _logger;
private readonly JsonSerializerOptions _jsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase };
public ReliableMessagePublisher(IConnection connection, ILogger<ReliableMessagePublisher> logger)
{
_connection = connection;
_logger = logger;
}
public async Task PublishAsync<T>(T message, string exchange, string routingKey,
CancellationToken ct = default)
{
using var channel = _connection.CreateModel();
// 开启发布确认
channel.ConfirmSelect();
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 持久化
properties.MessageId = Guid.NewGuid().ToString();
properties.ContentType = "application/json";
properties.DeliveryMode = 2;
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
// 添加追踪头
if (Activity.Current != null)
{
properties.Headers = new Dictionary<string, object>
{
["traceparent"] = $"00-{Activity.Current.TraceId}-{Activity.Current.SpanId}-01"
};
}
var body = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
// 发布
channel.BasicPublish(exchange, routingKey, properties, body);
// 等待确认
if (!channel.WaitForConfirms(TimeSpan.FromSeconds(5)))
{
throw new InvalidOperationException("消息发布确认失败");
}
_logger.LogInformation("消息已发布: {Exchange}/{RoutingKey} ({MessageId})",
exchange, routingKey, properties.MessageId);
}
}
// 3. 可靠消费
public class ReliableMessageConsumer : BackgroundService
{
private readonly IConnectionFactory _connectionFactory;
private readonly IServiceProvider _sp;
private readonly ILogger<ReliableMessageConsumer> _logger;
private readonly string _queueName;
public ReliableMessageConsumer(
IConnectionFactory connectionFactory,
IServiceProvider sp,
ILogger<ReliableMessageConsumer> logger,
string queueName)
{
_connectionFactory = connectionFactory;
_sp = sp;
_logger = logger;
_queueName = queueName;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var connection = _connectionFactory.CreateConnection();
var channel = connection.CreateModel();
// 预取数量(QoS)
channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += async (model, ea) =>
{
try
{
var messageType = ea.BasicProperties.Type ?? ea.RoutingKey;
using var scope = _sp.CreateScope();
await ProcessMessageAsync(scope.ServiceProvider, ea, stoppingToken);
// 手动确认
channel.BasicAck(ea.DeliveryTag, multiple: false);
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息失败: {RoutingKey}", ea.RoutingKey);
// 拒绝并重入队列(最多 3 次)
var retryCount = GetRetryCount(ea);
if (retryCount < 3)
{
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: true);
}
else
{
// 超过重试次数,拒绝并进入死信队列
channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
}
}
};
channel.BasicConsume(_queueName, autoAck: false, consumer);
return Task.CompletedTask;
}
private int GetRetryCount(BasicDeliverEventArgs ea)
{
if (ea.BasicProperties.Headers?.TryGetValue("x-death", out var death) == true)
{
var deaths = (List<object>)death;
if (deaths.Count > 0)
{
var deathDict = (Dictionary<string, object>)deaths[0];
return Convert.ToInt32(deathDict["count"]);
}
}
return 0;
}
private async Task ProcessMessageAsync(IServiceProvider sp, BasicDeliverEventArgs ea, CancellationToken ct)
{
// 根据路由键分发处理
var body = ea.Body.ToArray();
var json = Encoding.UTF8.GetString(body);
switch (ea.RoutingKey)
{
case "order.created":
var orderCreated = JsonSerializer.Deserialize<OrderCreatedEvent>(json);
var inventoryService = sp.GetRequiredService<IInventoryService>();
await inventoryService.ReserveStockAsync(orderCreated!.OrderId, orderCreated.Items, ct);
break;
case "order.paid":
var orderPaid = JsonSerializer.Deserialize<OrderPaidEvent>(json);
var shippingService = sp.GetRequiredService<IShippingService>();
await shippingService.ScheduleDeliveryAsync(orderPaid!.OrderId, ct);
break;
}
}
}Kafka 深入
生产者与消费者
// dotnet add package Confluent.Kafka
// Kafka 核心概念:
// Topic(主题)→ Partition(分区)→ Offset(偏移量)
// Consumer Group(消费者组)— 同组内消费者分担分区
// 1. 生产者配置
public class KafkaProducerService
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaProducerService> _logger;
private readonly string _bootstrapServers;
public KafkaProducerService(IConfiguration config, ILogger<KafkaProducerService> logger)
{
_logger = logger;
_bootstrapServers = config["Kafka:BootstrapServers"]!;
var producerConfig = new ProducerConfig
{
BootstrapServers = _bootstrapServers,
Acks = Acks.All, // 等待所有副本确认
EnableIdempotence = true, // 幂等生产
LingerMs = 20, // 批量发送延迟
BatchNumMessages = 100, // 批量大小
CompressionType = CompressionType.Lz4, // 压缩
MessageMaxBytes = 1048576, // 最大 1MB
Retries = 3,
RetryBackoffMs = 100
};
_producer = new ProducerBuilder<string, string>(producerConfig).Build();
}
public async Task ProduceAsync<T>(string topic, string key, T value,
CancellationToken ct = default)
{
var json = JsonSerializer.Serialize(value);
var message = new Message<string, string>
{
Key = key,
Value = json,
Headers = new Headers()
};
// 添加追踪头
if (Activity.Current != null)
{
message.Headers.Add("traceparent",
Encoding.UTF8.GetBytes($"00-{Activity.Current.TraceId}-{Activity.Current.SpanId}-01"));
}
var result = await _producer.ProduceAsync(topic, message, ct);
_logger.LogInformation(
"消息已发送: Topic={Topic}, Partition={Partition}, Offset={Offset}",
result.Topic, result.Partition, result.Offset);
}
// 批量发送
public async Task ProduceBatchAsync<T>(string topic, IEnumerable<(string Key, T Value)> items,
CancellationToken ct = default)
{
var tasks = items.Select(item =>
{
var json = JsonSerializer.Serialize(item.Value);
return _producer.ProduceAsync(topic,
new Message<string, string> { Key = item.Key, Value = json }, ct);
});
await Task.WhenAll(tasks);
_producer.Flush(TimeSpan.FromSeconds(10));
}
}
// 2. 消费者服务
public class KafkaConsumerService : BackgroundService
{
private readonly IConfiguration _config;
private readonly IServiceProvider _sp;
private readonly ILogger<KafkaConsumerService> _logger;
private readonly string _topic;
private readonly string _groupId;
public KafkaConsumerService(
IConfiguration config,
IServiceProvider sp,
ILogger<KafkaConsumerService> logger,
string topic,
string groupId)
{
_config = config;
_sp = sp;
_logger = logger;
_topic = topic;
_groupId = groupId;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _config["Kafka:BootstrapServers"],
GroupId = _groupId,
AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早开始消费
EnableAutoCommit = false, // 手动提交
MaxPollIntervalMs = 300000, // 最大轮询间隔
SessionTimeoutMs = 30000,
FetchMinBytes = 1024, // 最小拉取字节数
FetchMaxWaitMs = 500 // 最大等待时间
};
using var consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetErrorHandler((_, e) => _logger.LogError("Kafka 错误: {Error}", e.Reason))
.SetPartitionsAssignedHandler((c, partitions) =>
{
_logger.LogInformation("分配分区: {Partitions}",
string.Join(", ", partitions.Select(p => p.Partition)));
return partitions;
})
.Build();
consumer.Subscribe(_topic);
_logger.LogInformation("开始消费 Topic: {Topic}, Group: {Group}", _topic, _groupId);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var result = consumer.Consume(stoppingToken);
using var scope = _sp.CreateScope();
await ProcessMessageAsync(scope.ServiceProvider, result, stoppingToken);
// 手动提交偏移量
consumer.Commit(result);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "消费错误: {Error}", ex.Error.Reason);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理消息失败");
}
}
consumer.Close();
}
private async Task ProcessMessageAsync(IServiceProvider sp, ConsumeResult<string, string> result, CancellationToken ct)
{
var message = result.Message.Value;
_logger.LogDebug("消费: Topic={Topic}, Partition={Partition}, Offset={Offset}",
result.Topic, result.Partition, result.Offset);
// 恢复追踪上下文
if (result.Message.Headers.TryGetLastBytes("traceparent", out var traceBytes))
{
var traceparent = Encoding.UTF8.GetString(traceBytes);
// 恢复 ActivityContext...
}
switch (result.Topic)
{
case "order-events":
var orderEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(message);
var inventoryService = sp.GetRequiredService<IInventoryService>();
await inventoryService.ReserveStockAsync(orderEvent!.OrderId, orderEvent.Items, ct);
break;
}
}
}
// 注册
builder.Services.AddSingleton<KafkaProducerService>();
builder.Services.AddHostedService(sp => new KafkaConsumerService(
sp.GetRequiredService<IConfiguration>(),
sp,
sp.GetRequiredService<ILogger<KafkaConsumerService>>(),
topic: "order-events",
groupId: "inventory-service"));RabbitMQ vs Kafka 对比
选型指南
// RabbitMQ 适用场景:
// - 任务队列(发邮件、生成报告)
// - RPC 模式(请求-响应)
// - 消息路由复杂(Topic、Header 匹配)
// - 消息量适中(万级/秒)
// - 需要消息级别确认
// Kafka 适用场景:
// - 事件流处理(日志收集、事件溯源)
// - 高吞吐(百万级/秒)
// - 消息回溯(通过 Offset 重新消费)
// - 大数据分析管道
// - 需要消息顺序保证(分区级别)
// 对比表:
// | 特性 | RabbitMQ | Kafka |
// |-----------|-----------------|--------------------|
// | 吞吐量 | ~万级/秒 | ~百万级/秒 |
// | 延迟 | 微秒级 | 毫秒级 |
// | 消息保留 | 消费后删除 | 按时间/大小保留 |
// | 消息顺序 | 队列级别 | 分区级别 |
// | 回溯消费 | 不支持 | 支持(Offset) |
// | 路由 | 丰富(4种交换机) | 简单(Topic) |
// | 协议 | AMQP | 自定义 TCP |优点
缺点
总结
RabbitMQ 适合任务队列和复杂路由场景,通过交换机类型(Direct/Fanout/Topic)实现灵活的消息分发。Kafka 适合高吞吐的事件流处理,通过分区和消费者组实现水平扩展。可靠性保证:RabbitMQ 通过发布确认和手动 ACK,Kafka 通过 Acks.All 和手动提交 Offset。死信队列处理消费失败的消息。建议任务队列用 RabbitMQ,事件流用 Kafka,两者也可组合使用。
关键知识点
- 先分清这个主题位于请求链路、后台任务链路还是基础设施链路。
- 服务端主题通常不只关心功能正确,还关心稳定性、性能和可观测性。
- 任何框架能力都要结合配置、生命周期、异常传播和外部依赖一起看。
- 消息系统主题的核心是削峰、解耦和异步一致性,而不是单纯“能发能收”。
项目落地视角
- 画清请求进入、业务执行、外部调用、日志记录和错误返回的完整路径。
- 为关键链路补齐超时、重试、熔断、追踪和结构化日志。
- 把配置与敏感信息分离,并明确不同环境的差异来源。
- 明确消息契约、序列化格式、幂等键和失败补偿机制。
常见误区
- 只会堆中间件或组件,不知道它们在链路中的执行顺序。
- 忽略生命周期和线程池、连接池等运行时资源约束。
- 没有监控和测试就对性能或可靠性下结论。
- 把消息队列当同步 RPC 替代品。
进阶路线
- 继续向运行时行为、可观测性、发布治理和微服务协同深入。
- 把主题和数据库、缓存、消息队列、认证授权联动起来理解。
- 沉淀团队级模板,包括统一异常处理、配置约定和基础设施封装。
- 继续补齐顺序性、事务消息、Outbox、事件溯源和流式治理。
适用场景
- 当你准备把《消息队列深入(RabbitMQ/Kafka)》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合 API 服务、后台任务、实时通信、认证授权和微服务协作场景。
- 当需求开始涉及稳定性、性能、可观测性和发布流程时,这类主题会成为基础设施能力。
落地建议
- 先定义请求链路与失败路径,再决定中间件、过滤器、服务边界和依赖方式。
- 为关键链路补日志、指标、追踪、超时与重试策略。
- 环境配置与敏感信息分离,避免把生产参数写死在代码或镜像里。
排错清单
- 先确认问题发生在路由、模型绑定、中间件、业务层还是基础设施层。
- 检查 DI 生命周期、配置来源、序列化规则和认证上下文。
- 查看线程池、连接池、缓存命中率和外部依赖超时。
复盘问题
- 如果把《消息队列深入(RabbitMQ/Kafka)》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《消息队列深入(RabbitMQ/Kafka)》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《消息队列深入(RabbitMQ/Kafka)》最大的收益和代价分别是什么?
