整合 Kafka 消息流
大约 13 分钟约 3788 字
整合 Kafka 消息流
简介
在 ASP.NET Core 项目中整合 Kafka,重点不只是"把消息发出去",而是要围绕事件模型、分区策略、消费组、幂等处理、失败重试和可观测性建立一整套消息治理方式。Kafka 更适合做高吞吐事件流和异步解耦,而不是简单替代所有消息中间件场景。
Kafka 在架构中的位置
生产者(Producer) Kafka 集群 消费者(Consumer)
| | |
v v v
[订单服务] --事件发布--> [Topic: order-events] --消费--> [库存服务]
[支付服务] --事件发布--> [Topic: payment-events] --消费--> [通知服务]
[用户服务] --事件发布--> [Topic: user-events] --消费--> [分析服务]
关键概念:
- Broker: Kafka 服务节点
- Topic: 消息分类(逻辑概念)
- Partition: Topic 的物理分片(并行消费单位)
- Consumer Group: 消费者组(一个分区只被组内一个消费者消费)
- Offset: 消息在分区中的位置(消费进度标记)
- ISR: In-Sync Replicas(同步副本集合)Kafka vs 其他消息中间件
| 特性 | Kafka | RabbitMQ | Azure Service Bus |
|---|---|---|---|
| 吞吐量 | 极高(百万级/秒) | 中等(万级/秒) | 中等 |
| 消息模型 | 发布/订阅 + 持久化日志 | 队列 + 发布/订阅 | 队列 + 主题 |
| 消息回放 | 支持(按 offset) | 不支持 | 不支持 |
| 持久化 | 磁盘(高性能顺序写) | 内存/磁盘 | 磁盘 |
| 顺序保证 | 分区内有序 | 队列内有序 | 分区内有序 |
| 适用场景 | 事件流、日志、大数据 | 任务队列、RPC | 企业消息、事务 |
特点
实现
安装依赖
# NuGet 包
dotnet add package Confluent.Kafka
dotnet add package Microsoft.Extensions.HostingProducer:可靠生产与序列化
// ============================================
// Producer 配置
// ============================================
using Confluent.Kafka;
using System.Text.Json;
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092",
// 确认级别:All = 所有 ISR 副本确认(最高可靠性)
Acks = Acks.All,
// 启用幂等生产(防止网络重试导致重复)
EnableIdempotence = true,
// 消息发送超时
MessageTimeoutMs = 10000,
// 批量发送延迟(降低请求频率,提高吞吐)
LingerMs = 20,
// 压缩算法(Lz4 平衡了压缩率和 CPU 开销)
CompressionType = CompressionType.Lz4,
// 重试次数
Retries = 3,
// 重试间隔(指数退避)
// RetryBackoffMs = 100 // 默认 100ms
// 安全认证(SASL)
// SecurityProtocol = SecurityProtocol.SaslSsl,
// SaslMechanism = SaslMechanism.Plain,
// SaslUsername = "your-username",
// SaslPassword = "your-password"
};
// 注册为单例
builder.Services.AddSingleton<IProducer<string, string>>(
new ProducerBuilder<string, string>(producerConfig).Build());// ============================================
// 事件定义
// ============================================
/// <summary>
/// 订单创建事件 — 表达"订单已创建"这个事实
/// </summary>
public record OrderCreatedEvent(
string OrderId,
long UserId,
decimal TotalAmount,
string Currency,
List<OrderItem> Items,
DateTimeOffset CreatedAt,
int SchemaVersion = 1);
public record OrderItem(
string ProductId,
string ProductName,
int Quantity,
decimal UnitPrice);
/// <summary>
/// 支付成功事件
/// </summary>
public record PaymentSucceededEvent(
string PaymentId,
string OrderId,
decimal Amount,
string Currency,
DateTimeOffset PaidAt,
string Method,
int SchemaVersion = 1);
// ============================================
// 事件生产者
// ============================================
public class OrderEventProducer
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<OrderEventProducer> _logger;
private readonly string _topic;
public OrderEventProducer(
IProducer<string, string> producer,
ILogger<OrderEventProducer> logger,
IConfiguration configuration)
{
_producer = producer;
_logger = logger;
_topic = configuration["Kafka:Topics:OrderEvents"] ?? "order-events";
}
public async Task PublishAsync(
OrderCreatedEvent evt,
CancellationToken cancellationToken = default)
{
var message = new Message<string, string>
{
// Key 决定消息落在哪个分区
// 使用 OrderId 确保同一订单的事件在同一个分区(有序)
Key = evt.OrderId,
Value = JsonSerializer.Serialize(evt),
Headers = new Headers
{
{ "eventType", System.Text.Encoding.UTF8.GetBytes(nameof(OrderCreatedEvent)) },
{ "schemaVersion", System.Text.Encoding.UTF8.GetBytes(evt.SchemaVersion.ToString()) },
{ "producedAt", System.Text.Encoding.UTF8.GetBytes(DateTimeOffset.UtcNow.ToString("O")) }
}
};
try
{
var result = await _producer.ProduceAsync(_topic, message, cancellationToken);
_logger.LogInformation(
"Kafka 消息发布成功. Topic={Topic}, Partition={Partition}, " +
"Offset={Offset}, OrderId={OrderId}, LatencyMs={LatencyMs}",
result.Topic,
result.Partition,
result.Offset,
evt.OrderId,
result.Message.Timestamp.UnixTimestampMs);
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex,
"Kafka 消息发布失败. Topic={Topic}, OrderId={OrderId}, Error={Error}",
_topic, evt.OrderId, ex.Error.Reason);
throw;
}
}
/// <summary>
/// 批量发布事件
/// </summary>
public async Task PublishBatchAsync(
IEnumerable<OrderCreatedEvent> events,
CancellationToken cancellationToken = default)
{
var tasks = events.Select(evt => PublishAsync(evt, cancellationToken));
await Task.WhenAll(tasks);
}
}生产端关键点:
- key 决定消息落在哪个分区
- 同一个 key 常用于保证某类业务实体在单分区内有序
- 事件结构要版本化,不能随意破坏兼容性
- Acks.All + EnableIdempotence = 最高可靠性但略降低吞吐
- LingerMs 控制批量发送的等待时间,越大吞吐越高但延迟越高Consumer:后台消费与手动提交 offset
// ============================================
// 消费者配置
// ============================================
using Confluent.Kafka;
using System.Text.Json;
public class OrderEventConsumer : BackgroundService
{
private readonly ILogger<OrderEventConsumer> _logger;
private readonly IServiceScopeFactory _scopeFactory;
private readonly IConfiguration _configuration;
public OrderEventConsumer(
ILogger<OrderEventConsumer> logger,
IServiceScopeFactory scopeFactory,
IConfiguration configuration)
{
_logger = logger;
_scopeFactory = scopeFactory;
_configuration = configuration;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.Run(() => ConsumeLoop(stoppingToken), stoppingToken);
}
private void ConsumeLoop(CancellationToken stoppingToken)
{
var config = new ConsumerConfig
{
BootstrapServers = _configuration["Kafka:BootstrapServers"] ?? "localhost:9092",
GroupId = "order-processor-group",
AutoOffsetReset = AutoOffsetReset.Earliest, // 从最早的消息开始
EnableAutoCommit = false, // 手动提交 offset
MaxPollIntervalMs = 300000, // 最大处理时间 5 分钟
SessionTimeoutMs = 10000, // 会话超时 10 秒
HeartbeatIntervalMs = 3000, // 心跳间隔 3 秒
IsolationLevel = IsolationLevel.ReadCommitted // 只读已提交的消息
};
using var consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, e) =>
_logger.LogError("Kafka 消费错误: Code={Code}, Reason={Reason}", e.Code, e.Reason))
.SetPartitionsRevokedHandler((_, e) =>
_logger.LogWarning("Kafka 分区被回收: {Partitions}", string.Join(", ", e.Partitions)))
.SetPartitionsAssignedHandler((_, e) =>
_logger.LogInformation("Kafka 分区被分配: {Partitions}", string.Join(", ", e.Partitions)))
.Build();
consumer.Subscribe("order-events");
_logger.LogInformation("Kafka 消费者已启动. Group={Group}, Topic=order-events", config.GroupId);
try
{
while (!stoppingToken.IsCancellationRequested)
{
var consumeResult = consumer.Consume(stoppingToken);
try
{
ProcessMessage(consumeResult);
consumer.Commit(consumeResult);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Kafka 消息处理失败. Topic={Topic}, Partition={Partition}, Offset={Offset}",
consumeResult.Topic, consumeResult.Partition, consumeResult.Offset);
// 处理失败的消息可以发送到死信 Topic
await SendToDeadLetterAsync(consumeResult, ex);
}
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("Kafka 消费者正在关闭...");
}
finally
{
consumer.Close();
_logger.LogInformation("Kafka 消费者已关闭");
}
}
private void ProcessMessage(ConsumeResult<string, string> consumeResult)
{
var messageType = consumeResult.Message.Headers
.FirstOrDefault(h => h.Key == "eventType")?.GetValueString()
?? "Unknown";
_logger.LogInformation(
"收到 Kafka 消息. Topic={Topic}, Partition={Partition}, Offset={Offset}, " +
"Type={Type}, Key={Key}",
consumeResult.Topic, consumeResult.Partition, consumeResult.Offset,
messageType, consumeResult.Message.Key);
switch (messageType)
{
case nameof(OrderCreatedEvent):
var orderEvent = JsonSerializer.Deserialize<OrderCreatedEvent>(consumeResult.Message.Value);
if (orderEvent != null)
{
ProcessOrderCreated(orderEvent);
}
break;
default:
_logger.LogWarning("未知消息类型: {Type}", messageType);
break;
}
}
private void ProcessOrderCreated(OrderCreatedEvent evt)
{
// 使用 DI 作用域获取服务
using var scope = _scopeFactory.CreateScope();
var orderService = scope.ServiceProvider.GetRequiredService<IOrderProcessingService>();
orderService.ProcessNewOrder(evt);
}
private async Task SendToDeadLetterAsync(ConsumeResult<string, string> result, Exception ex)
{
try
{
using var scope = _scopeFactory.CreateScope();
var producer = scope.ServiceProvider.GetRequiredService<IProducer<string, string>>();
var deadLetterMessage = new Message<string, string>
{
Key = result.Message.Key,
Value = JsonSerializer.Serialize(new
{
OriginalTopic = result.Topic,
OriginalPartition = result.Partition,
OriginalOffset = result.Offset,
OriginalValue = result.Message.Value,
ErrorType = ex.GetType().Name,
ErrorMessage = ex.Message,
FailedAt = DateTimeOffset.UtcNow
}),
Headers = new Headers
{
{ "originalTopic", System.Text.Encoding.UTF8.GetBytes(result.Topic) },
{ "errorType", System.Text.Encoding.UTF8.GetBytes(ex.GetType().Name) }
}
};
await producer.ProduceAsync("order-events-dlq", deadLetterMessage);
_logger.LogInformation("消息已发送到死信 Topic");
}
catch (Exception dlqEx)
{
_logger.LogError(dlqEx, "发送到死信 Topic 失败");
}
}
}消费端关键点:
- 自动提交简单,但容易产生"业务失败但 offset 已提交"的问题
- 手动提交更安全,但需要自己设计失败与重试策略
- 幂等处理通常比"刚好消费一次"更现实
- EnableAutoCommit = false + 手动 Commit = 可靠消费
- MaxPollIntervalMs 要大于最大消息处理时间配置化 Kafka 设置
// appsettings.json
{
"Kafka": {
"BootstrapServers": "localhost:9092",
"Topics": {
"OrderEvents": "order-events",
"PaymentEvents": "payment-events",
"DeadLetter": "order-events-dlq"
},
"Producer": {
"Acks": "All",
"EnableIdempotence": true,
"LingerMs": 20,
"CompressionType": "Lz4",
"Retries": 3
},
"Consumer": {
"GroupId": "order-processor-group",
"AutoOffsetReset": "Earliest",
"EnableAutoCommit": false,
"MaxPollIntervalMs": 300000,
"SessionTimeoutMs": 10000
}
}
}// 从配置读取 Kafka 设置
var kafkaConfig = builder.Configuration.GetSection("Kafka");
var producerConfig = new ProducerConfig
{
BootstrapServers = kafkaConfig["BootstrapServers"],
Acks = Enum.Parse<Acks>(kafkaConfig["Producer:Acks"] ?? "All"),
EnableIdempotence = kafkaConfig.GetValue<bool>("Producer:EnableIdempotence", true),
LingerMs = kafkaConfig.GetValue<int>("Producer:LingerMs", 20),
CompressionType = Enum.Parse<CompressionType>(kafkaConfig["Producer:CompressionType"] ?? "Lz4"),
Retries = kafkaConfig.GetValue<int>("Producer:Retries", 3)
};Topic / Consumer Group / 事件建模
Topic 设计建议:
1. 按业务域划分(推荐)
- order-events (订单事件)
- payment-events (支付事件)
- inventory-events (库存事件)
- user-events (用户事件)
- notification-events (通知事件)
2. 不要按代码模块命名
- x order-controller-events (错误)
- x payment-service-queue (错误)
3. 事件粒度
- 细粒度:每种事件一个 Topic(order-created, order-cancelled)
- 粗粒度:一个域一个 Topic,用 eventType 头区分
- 推荐:粗粒度 + eventType 头,减少 Topic 数量
Consumer Group 设计:
- order-processor-group (订单处理服务)
- notification-group (通知服务)
- analytics-group (分析服务)
- search-indexer-group (搜索索引服务)// ============================================
// 事件 Schema 版本化
// ============================================
// V1 事件
public record OrderCreatedEvent(
string OrderId,
long UserId,
decimal TotalAmount,
List<OrderItem> Items,
DateTimeOffset CreatedAt,
int SchemaVersion = 1);
// V2 事件(向后兼容 — 新增字段)
public record OrderCreatedEventV2(
string OrderId,
long UserId,
decimal TotalAmount,
List<OrderItem> Items,
DateTimeOffset CreatedAt,
// V2 新增字段
string Currency,
string ShippingAddress,
int SchemaVersion = 2) : OrderCreatedEvent(
OrderId, UserId, TotalAmount, Items, CreatedAt, SchemaVersion);
// 消费者兼容性处理
public object? DeserializeEvent(string json, int schemaVersion)
{
return schemaVersion switch
{
1 => JsonSerializer.Deserialize<OrderCreatedEvent>(json),
2 => JsonSerializer.Deserialize<OrderCreatedEventV2>(json),
_ => throw new NotSupportedException($"Unknown schema version: {schemaVersion}")
};
}幂等处理
// ============================================
// 数据库幂等表
// ============================================
public class ProcessedMessage
{
public string MessageId { get; set; } = string.Empty; // Topic-Partition-Offset
public string EventType { get; set; } = string.Empty;
public string EventKey { get; set; } = string.Empty;
public DateTimeOffset ProcessedAt { get; set; }
}
// 使用数据库保证幂等
public class IdempotentEventProcessor
{
private readonly AppDbContext _context;
public IdempotentEventProcessor(AppDbContext context)
{
_context = context;
}
public async Task<bool> TryProcessAsync<T>(
ConsumeResult<string, string> consumeResult,
Func<T, Task> handler) where T : class
{
var messageId = $"{consumeResult.Topic}-{consumeResult.Partition.Value}-{consumeResult.Offset.Value}";
// 检查是否已处理
var alreadyProcessed = await _context.ProcessedMessages
.AnyAsync(m => m.MessageId == messageId);
if (alreadyProcessed)
{
return false; // 跳过重复消息
}
// 反序列化并处理
var evt = JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
if (evt != null)
{
await handler(evt);
// 标记为已处理
_context.ProcessedMessages.Add(new ProcessedMessage
{
MessageId = messageId,
EventType = typeof(T).Name,
EventKey = consumeResult.Message.Key,
ProcessedAt = DateTimeOffset.UtcNow
});
await _context.SaveChangesAsync();
}
return true;
}
}
// 使用示例
await _idempotentProcessor.TryProcessAsync<OrderCreatedEvent>(
consumeResult,
evt => _orderService.ProcessNewOrderAsync(evt));带重试的消费策略
// ============================================
// 指数退避重试
// ============================================
public class RetryPolicy
{
public int MaxRetries { get; init; } = 3;
public TimeSpan InitialDelay { get; init; } = TimeSpan.FromSeconds(1);
public TimeSpan MaxDelay { get; init; } = TimeSpan.FromSeconds(30);
public double BackoffMultiplier { get; init; } = 2;
public async Task<T> ExecuteAsync<T>(Func<Task<T>> action)
{
var attempt = 0;
var delay = InitialDelay;
while (true)
{
try
{
return await action();
}
catch (Exception ex) when (IsTransient(ex) && attempt < MaxRetries)
{
attempt++;
delay = TimeSpan.FromTicks(
Math.Min(delay.Ticks * (long)BackoffMultiplier, MaxDelay.Ticks));
_logger.LogWarning(ex,
"操作失败,{Delay}ms 后重试(第 {Attempt}/{MaxRetries} 次)",
delay.TotalMilliseconds, attempt, MaxRetries);
await Task.Delay(delay);
}
}
}
private static bool IsTransient(Exception ex)
{
// 判断是否为瞬时错误(网络超时、连接断开等)
return ex is TimeoutException
or OperationCanceledException
or HttpRequestException;
}
}Outbox 模式 — 保证事件可靠发布
// ============================================
// Outbox 表 — 保证数据库操作和事件发布的原子性
// ============================================
public class OutboxMessage
{
public long Id { get; set; }
public string AggregateType { get; set; } = string.Empty;
public string AggregateId { get; set; } = string.Empty;
public string EventType { get; set; } = string.Empty;
public string Payload { get; set; } = string.Empty; // JSON 序列化的事件
public string TargetTopic { get; set; } = string.Empty;
public OutboxStatus Status { get; set; } = OutboxStatus.Pending;
public int RetryCount { get; set; }
public DateTimeOffset? ProcessedAt { get; set; }
public string? ErrorMessage { get; set; }
public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;
}
public enum OutboxStatus
{
Pending,
Processing,
Completed,
Failed
}
// ============================================
// 在业务事务中写入 Outbox
// ============================================
public class OrderService
{
private readonly AppDbContext _context;
public async Task<string> CreateOrderAsync(CreateOrderRequest request)
{
using var transaction = await _context.Database.BeginTransactionAsync();
// 1. 创建订单
var order = new Order
{
Id = Guid.NewGuid().ToString(),
UserId = request.UserId,
TotalAmount = request.TotalAmount,
Status = "Created"
};
_context.Orders.Add(order);
// 2. 写入 Outbox(同一事务)
var outboxMessage = new OutboxMessage
{
AggregateType = "Order",
AggregateId = order.Id,
EventType = nameof(OrderCreatedEvent),
Payload = JsonSerializer.Serialize(new OrderCreatedEvent(
order.Id, request.UserId, request.TotalAmount,
"CNY", new List<OrderItem>(), DateTimeOffset.UtcNow)),
TargetTopic = "order-events",
Status = OutboxStatus.Pending
};
_context.OutboxMessages.Add(outboxMessage);
await _context.SaveChangesAsync();
await transaction.CommitAsync();
return order.Id;
}
}
// ============================================
// Outbox 发布器 — 后台服务定期扫描并发布
// ============================================
public class OutboxPublisher : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<OutboxPublisher> _logger;
private readonly TimeSpan _pollInterval = TimeSpan.FromSeconds(5);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await PublishPendingMessagesAsync(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox 发布失败");
}
await Task.Delay(_pollInterval, stoppingToken);
}
}
private async Task PublishPendingMessagesAsync(CancellationToken stoppingToken)
{
using var scope = _scopeFactory.CreateScope();
var context = scope.ServiceProvider.GetRequiredService<AppDbContext>();
var producer = scope.ServiceProvider.GetRequiredService<IProducer<string, string>>();
// 批量获取待发布消息
var messages = await context.OutboxMessages
.Where(m => m.Status == OutboxStatus.Pending)
.OrderBy(m => m.CreatedAt)
.Take(100)
.ToListAsync(stoppingToken);
foreach (var message in messages)
{
try
{
message.Status = OutboxStatus.Processing;
await context.SaveChangesAsync(stoppingToken);
var kafkaMessage = new Message<string, string>
{
Key = message.AggregateId,
Value = message.Payload,
Headers = new Headers
{
{ "eventType", System.Text.Encoding.UTF8.GetBytes(message.EventType) }
}
};
await producer.ProduceAsync(message.TargetTopic, kafkaMessage, stoppingToken);
message.Status = OutboxStatus.Completed;
message.ProcessedAt = DateTimeOffset.UtcNow;
}
catch (Exception ex)
{
message.Status = OutboxStatus.Failed;
message.RetryCount++;
message.ErrorMessage = ex.Message;
if (message.RetryCount >= 5)
{
_logger.LogError(
"Outbox 消息达到最大重试次数: Id={Id}, Type={Type}",
message.Id, message.EventType);
}
}
await context.SaveChangesAsync(stoppingToken);
}
}
}可观测性
Kafka 监控指标
/// <summary>
/// Kafka 消费者健康检查
/// </summary>
public class KafkaHealthCheck : IHealthCheck
{
private readonly IConsumer<string, string> _consumer;
public KafkaHealthCheck(IConsumer<string, string> consumer)
{
_consumer = consumer;
}
public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
// 检查消费者是否连接正常
var metadata = _consumer.ListTopics(TimeSpan.FromSeconds(5));
return Task.FromResult(HealthCheckResult.Healthy(
$"Kafka 连接正常. Topics: {metadata.Topics.Count}"));
}
catch (Exception ex)
{
return Task.FromResult(HealthCheckResult.Unhealthy(
"Kafka 连接异常", ex));
}
}
}
// 注册健康检查
builder.Services.AddHealthChecks()
.AddCheck<KafkaHealthCheck>("kafka", tags: new[] { "messaging" });关键监控指标
生产端指标:
- 发送成功率 / 失败率
- 发送延迟(P50/P95/P99)
- 批量大小和频率
- 压缩率
消费端指标:
- 消费 Lag(消费者落后于生产者的消息数)
- 消费速率(每秒处理的消息数)
- 处理延迟(消息从产生到被处理的时间)
- 错误率 / 重试率
- 死信消息数量
Kafka 集群指标:
- Broker 在线数量
- Partition 数量和分布
- ISR 副本数量
- 磁盘使用率
- 网络吞吐量优点
缺点
总结
在 ASP.NET Core 中接 Kafka,关键不是"连上了"而是"治理好"。真正落地时要先明确事件边界、分区策略、消费组关系和失败处理方式,再决定如何生产、消费和扩展,否则很容易把异步系统做成黑盒。生产端要配置 Acks.All + EnableIdempotence 保证可靠性,消费端要手动提交 offset + 幂等处理 + 死信 Topic 保证正确性,Outbox 模式保证数据库操作和事件发布的原子性。
关键知识点
- Topic 表达的是业务事件域,不是代码模块名。
- key 决定分区和局部有序性。
- offset 提交时机直接关系到重复消费和丢消息风险。
- Kafka 适合事件流,不代表适合所有队列类需求。
项目落地视角
- 订单、支付、库存、用户行为都很适合事件化。
- 后台消费可用于构建读模型、索引、报表和通知。
- 关键链路必须监控 lag、失败率、重试次数和死信数量。
- 微服务架构中 Kafka 更适合作为领域事件总线,而不是事务协调器。
常见误区
- 把数据库实体直接当事件发出,后续难演进。
- 消费失败后既不重试也不死信,消息悄悄丢掉。
- 没有幂等策略,却期待"永远恰好一次"。
- Topic 和分区数量完全拍脑袋,不看业务模型和消费关系。
- 忽略消费者 Lag 监控,直到消息大量堆积才发现。
进阶路线
- 深入学习 Kafka 分区、副本、ISR 和 Lag 监控。
- 学习 Schema Registry、Avro / Protobuf 事件版本治理。
- 结合 Outbox / CDC 做可靠事件发布。
- 与 Flink、Spark、ClickHouse 等实时链路整合。
适用场景
- 微服务异步解耦。
- 订单 / 支付 / 库存领域事件广播。
- 埋点日志与行为数据管道。
- 需要回放、重建、补偿的事件流场景。
落地建议
- 从少量清晰 Topic 开始,不要一开始就建一堆主题。
- 每个消费者都明确:幂等键、重试策略、死信策略。
- 把关键事件结构版本化,不要破坏兼容。
- 为 Kafka 接入建立独立监控面板,不要只靠业务日志观察。
排错清单
- 发不出消息:检查 broker、topic、网络、认证和序列化。
- 收不到消息:检查 group、offset、订阅关系和 topic 名。
- 重复消费:检查 commit 时机和幂等实现。
- 消费堆积:检查 partition 数、处理耗时和下游依赖。
