Kafka 消息队列
大约 14 分钟约 4082 字
Kafka 消息队列
简介
Kafka 是一个分布式消息与流处理平台,核心价值在于:高吞吐、可持久化、可水平扩展、适合事件流和异步解耦。它非常适合日志采集、事件驱动架构、埋点流水、订单异步处理、数据同步和实时计算场景,但前提是要同时理解 Topic、Partition、Offset、Consumer Group 和副本机制这些核心概念。Kafka 最初由 LinkedIn 开发,2011 年开源并成为 Apache 顶级项目,目前被广泛应用于大数据和微服务架构中。
特点
实现
核心概念与基础命令
核心概念:
- Broker:Kafka 节点
- Topic:消息逻辑主题
- Partition:Topic 的物理分区
- Offset:分区内消息位置编号
- Producer:生产者
- Consumer:消费者
- Consumer Group:消费者组
- Replica:副本核心概念详解
====== Broker ======
Kafka 集群由多个 Broker 组成,每个 Broker 就是一个 Kafka 服务进程。
- Broker ID 唯一标识一个节点
- 多个 Broker 组成集群,通过 ZooKeeper 或 KRaft 协调
- 一个 Broker 可以承载多个 Partition
====== Topic ======
Topic 是消息的逻辑分类,类似于数据库中的表。
- 生产者将消息发送到 Topic
- 消费者从 Topic 订阅消息
- Topic 可以配置分区数和副本数
====== Partition ======
Partition 是 Topic 的物理分片,是 Kafka 并发的基本单位。
- 每个 Partition 是一个有序的、不可变的日志
- 消息在 Partition 中有唯一的 Offset
- 分区内有序,分区间无全局顺序
- 一个 Partition 只能被同一消费组中的一个消费者消费
- 想要提高消费并发,需要增加 Partition 数量
====== Offset ======
Offset 是消息在 Partition 中的位置编号,从 0 开始递增。
- 消费者通过 Offset 跟踪消费进度
- 支持重置 Offset 实现消息回放
- __consumer_offsets Topic 存储消费组 Offset
====== Consumer Group ======
消费者组是 Kafka 实现消息分发和负载均衡的核心机制。
- 同一消费组内,一条消息只被一个消费者消费(负载均衡)
- 不同消费组可以独立消费同一 Topic 的消息(广播)
- 消费者数量变化时自动触发 Rebalance
- 消费者数量不应超过 Partition 数量
====== Replica ======
副本用于保证数据高可用。
- Leader Replica:处理读写请求
- Follower Replica:同步 Leader 数据,不处理请求
- ISR(In-Sync Replicas):与 Leader 保持同步的副本集合
- LEO(Log End Offset):日志末端偏移量
- HW(High Watermark):所有 ISR 副本都已复制的偏移量# 查看 Kafka 版本(示意)
cd /usr/local/kafka
bin/kafka-topics.sh --version
# 创建 Topic
bin/kafka-topics.sh --create \
--topic order-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# 查看 Topic 列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 Topic 详情
bin/kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092Topic 管理详解
# ====== 创建 Topic ======
# 基本创建
bin/kafka-topics.sh --create \
--topic order-events \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# 创建带配置的 Topic
bin/kafka-topics.sh --create \
--topic order-events \
--bootstrap-server localhost:9092 \
--partitions 6 \
--replication-factor 2 \
--config retention.ms=604800000 \
--config cleanup.policy=delete \
--config compression.type=lz4 \
--config max.message.bytes=10485760
# ====== 查看 Topic ======
# 列出所有 Topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
# 查看 Topic 详细信息
bin/kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092
# 查看所有 Topic 的详细信息
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092
# ====== 修改 Topic ======
# 增加分区数(注意:分区数只能增加不能减少)
bin/kafka-topics.sh --alter \
--topic order-events \
--bootstrap-server localhost:9092 \
--partitions 12
# 修改 Topic 配置
bin/kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--entity-type topics --entity-name order-events \
--add-config retention.ms=259200000
# ====== 删除 Topic ======
bin/kafka-topics.sh --delete \
--topic order-events \
--bootstrap-server localhost:9092
# ====== Topic 配置说明 ======
# retention.ms: 消息保留时间(毫秒),默认 7 天
# retention.bytes: 消息保留大小限制
# cleanup.policy: 清理策略,delete(删除过期消息)或 compact(保留最新 key)
# max.message.bytes: 单条消息最大字节数,默认 1MB
# compression.type: 压缩类型,none/gzip/snappy/lz4/zstd
# min.insync.replicas: 最小同步副本数(配合 acks=all 使用)
# segment.bytes: Segment 文件大小,默认 1GB
# segment.ms: Segment 滚动时间# 控制台生产者
bin/kafka-console-producer.sh --topic order-events --bootstrap-server localhost:9092
> {"orderId":1001,"status":"created"}
> {"orderId":1002,"status":"paid"}
# 控制台消费者
bin/kafka-console-consumer.sh --topic order-events --from-beginning --bootstrap-server localhost:9092生产者与消费者命令详解
# ====== 控制台生产者 ======
# 基本用法
bin/kafka-console-producer.sh --topic order-events --bootstrap-server localhost:9092
# 指定 Key(相同 Key 的消息会发到同一分区)
bin/kafka-console-producer.sh --topic order-events --bootstrap-server localhost:9092 \
--property parse.key=true --property key.separator=,
# 指定分区
bin/kafka-console-producer.sh --topic order-events --bootstrap-server localhost:9092 \
--property parse.key=true --property key.separator=,
# 从文件读取消息
bin/kafka-console-producer.sh --topic order-events --bootstrap-server localhost:9092 \
< messages.txt
# ====== 控制台消费者 ======
# 从最早的消息开始消费
bin/kafka-console-consumer.sh --topic order-events \
--from-beginning --bootstrap-server localhost:9092
# 从最新的消息开始消费
bin/kafka-console-consumer.sh --topic order-events \
--bootstrap-server localhost:9092
# 指定消费组
bin/kafka-console-consumer.sh --topic order-events \
--group order-service-group \
--bootstrap-server localhost:9092
# 从指定 Offset 开始消费
bin/kafka-console-consumer.sh --topic order-events \
--partition 0 --offset 100 \
--bootstrap-server localhost:9092
# 指定消费数量后退出
bin/kafka-console-consumer.sh --topic order-events \
--max-messages 10 \
--bootstrap-server localhost:9092
# 查看消息的 Key 和时间戳
bin/kafka-console-consumer.sh --topic order-events \
--from-beginning \
--property print.key=true \
--property key.separator=" -> " \
--property print.timestamp=true \
--bootstrap-server localhost:9092
# ====== 消费组管理 ======
# 列出所有消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看消费组详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-service-group
# 查看所有消费组的详情
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--all-groups --all-topics --describe
# 重置消费组 Offset
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-service-group \
--topic order-events \
--reset-offsets --to-earliest \
--execute
# 重置到最新 Offset
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-service-group \
--topic order-events \
--reset-offsets --to-latest \
--execute
# 重置到指定时间
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-service-group \
--topic order-events \
--reset-offsets --to-datetime "2024-01-01T00:00:00.000" \
--execute
# 删除消费组
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-service-group --deletePartition 与并发关系:
- 一个分区同一时刻只能被同一消费组中的一个消费者消费
- 想提高组内并发,通常要增加分区数
- 分区内有序,不同分区之间全局无序Linux 安装与服务管理(现代化示意)
# 下载并解压 Kafka(示意版本)
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz -C /usr/local
mv /usr/local/kafka_2.13-3.7.0 /usr/local/kafka完整安装指南
# ====== 安装 JDK ======
yum install -y java-11-openjdk java-11-openjdk-devel
java -version
# ====== 下载 Kafka ======
cd /usr/local
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
# 如果下载慢,使用镜像
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.7.0/kafka_2.13-3.7.0.tgz
# ====== 解压安装 ======
tar -xzf kafka_2.13-3.7.0.tgz
mv kafka_2.13-3.7.0 kafka
cd kafka
# ====== 创建数据和日志目录 ======
mkdir -p /data/kafka/logs
mkdir -p /var/log/kafka
# ====== 创建 Kafka 用户 ======
useradd -r -s /sbin/nologin kafka
chown -R kafka:kafka /usr/local/kafka
chown -R kafka:kafka /data/kafka
chown -R kafka:kafka /var/log/kafka# KRaft 模式初始化(新版本推荐,不再依赖 ZooKeeper)
cd /usr/local/kafka
bin/kafka-storage.sh random-uuid
# 假设得到 cluster id: XxYyZz123
bin/kafka-storage.sh format -t XxYyZz123 -c config/kraft/server.propertiesKRaft 配置详解
# ====== KRaft 模式配置 ======
# config/kraft/server.properties
# 节点配置
node.id=1
process.roles=broker,controller
controller.quorum.voters=1@localhost:9093
controller.listener.names=CONTROLLER
# 监听器配置
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://localhost:9092
# 数据目录
log.dirs=/data/kafka/logs
# 集群配置(多节点)
# node.id=1
# process.roles=broker,controller
# controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093
# listeners=PLAINTEXT://:9092,CONTROLLER://:9093
# advertised.listeners=PLAINTEXT://host1:9092
# ====== ZooKeeper 模式配置(旧版本) ======
# config/server.properties
# broker.id=0
# listeners=PLAINTEXT://:9092
# advertised.listeners=PLAINTEXT://host1:9092
# zookeeper.connect=host1:2181,host2:2181,host3:2181
# log.dirs=/data/kafka/logs
# num.partitions=3
# default.replication.factor=2
# min.insync.replicas=1
# log.retention.hours=168
# log.segment.bytes=1073741824
# log.retention.check.interval.ms=300000
# auto.create.topics.enable=true
# delete.topic.enable=true# 启动 Kafka(KRaft)
bin/kafka-server-start.sh -daemon config/kraft/server.properties
# 查看端口
ss -lntp | grep 9092启动与停止
# ====== 前台启动(调试用) ======
bin/kafka-server-start.sh config/kraft/server.properties
# ====== 后台启动 ======
bin/kafka-server-start.sh -daemon config/kraft/server.properties
# ====== 停止 Kafka ======
bin/kafka-server-stop.sh
# ====== 验证启动 ======
# 检查端口
ss -tlnp | grep 9092
ss -tlnp | grep 9093
# 检查日志
tail -f /data/kafka/logs/server.log
# 创建测试 Topic 验证
bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# systemd 示例:/etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka
After=network.target
[Service]
Type=simple
User=kafka
Group=kafka
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/kraft/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
LimitNOFILE=100000
[Install]
WantedBy=multi-user.targetSystemd 服务管理
# ====== 启用并管理 Kafka 服务 ======
systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
systemctl status kafka
systemctl restart kafka
systemctl stop kafka
# 查看日志
journalctl -u kafka -f
journalctl -u kafka --since "1 hour ago"
# ====== JVM 参数优化 ======
# 在 systemd 中设置 JVM 参数
# [Service]
# Environment="KAFKA_HEAP_OPTS=-Xmx4G -Xms4G"
# Environment="KAFKA_JVM_PERFORMANCE_OPTS=-XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
# 或在启动脚本中设置
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"Producer / Consumer 生产实践思路
using Confluent.Kafka;
using System.Text.Json;
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost:9092",
Acks = Acks.All,
EnableIdempotence = true,
MessageTimeoutMs = 10000,
LingerMs = 20,
CompressionType = CompressionType.Lz4
};
using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
var evt = new
{
OrderId = 1001,
Status = "Created",
CreatedAt = DateTimeOffset.UtcNow
};
var result = await producer.ProduceAsync("order-events", new Message<string, string>
{
Key = evt.OrderId.ToString(),
Value = JsonSerializer.Serialize(evt)
});
Console.WriteLine($"Partition={result.Partition}, Offset={result.Offset}");using Confluent.Kafka;
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "order-service-group",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe("order-events");
while (true)
{
var record = consumer.Consume();
Console.WriteLine(record.Message.Value);
// 业务处理成功后再提交 offset
consumer.Commit(record);
}消费者核心实践:
- 自动提交简单但不安全
- 手动提交更适合重要业务
- 如果处理失败,要明确:重试、跳过、死信还是人工介入Topic 设计与排障
Topic 设计建议:
- 按业务事件域命名:order-events、payment-events、inventory-events
- 不要按"某个页面"或"某段临时逻辑"命名 Topic
- Topic 粒度要服务于消费关系,而不是代码模块名Topic 设计最佳实践
====== Topic 命名规范 ======
- 格式:{domain}.{entity}.{event-type}
- 示例:order.order-created
- 示例:payment.payment-completed
- 示例:inventory.stock-reserved
====== 分区数设计 ======
- 分区数 = 目标最大消费并发数
- 建议初始 6-12 个分区,根据业务增长调整
- 分区数不宜过多(单 Broker 建议不超过 2000 个分区)
- 分区数应该是消费组消费者数量的倍数
====== 副本数设计 ======
- 生产环境建议 replication-factor >= 2
- 重要业务建议 replication-factor = 3
- min.insync.replicas = replication-factor - 1
====== 消息保留策略 ======
- 事件流场景:retention.ms=604800000(7 天)
- 日志场景:retention.ms=259200000(3 天)
- 补偿场景:retention.ms=86400000(1 天)
- 紧凑型 Topic(最新 key):cleanup.policy=compact
====== 消息 Key 设计 ======
- 使用业务 ID 作为 Key(如 orderId)
- 相同 Key 的消息保证在同一个分区
- Key 为 null 时消息随机分配到分区# 查看消费组状态
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group order-service-group
# 查看偏移量积压(Lag)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --all-groups --all-topics --describe排障命令详解
# ====== 消息生产测试 ======
# 发送测试消息
echo "test-message-$(date +%s)" | bin/kafka-console-producer.sh \
--topic order-events --bootstrap-server localhost:9092
# 验证消息是否可消费
bin/kafka-console-consumer.sh --topic order-events \
--from-beginning --max-messages 1 --bootstrap-server localhost:9092
# ====== 查看消息数量 ======
bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 --topic order-events
# ====== 查看集群元数据 ======
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
bin/kafka-metadata.sh --snapshot --command-id any --bootstrap-server localhost:9092
# ====== 查看 Broker 状态 ======
bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# ====== 分区重分配 ======
# 创建重分配计划
cat > /tmp/reassign.json << 'EOF'
{
"version": 1,
"partitions": [
{"topic": "order-events", "partition": 0, "replicas": [1, 2]},
{"topic": "order-events", "partition": 1, "replicas": [2, 1]}
]
}
EOF
# 执行重分配
bin/kafka-reassign-partitions.sh --execute \
--reassignment-json-file /tmp/reassign.json \
--bootstrap-server localhost:9092
# 验证重分配状态
bin/kafka-reassign-partitions.sh --verify \
--reassignment-json-file /tmp/reassign.json \
--bootstrap-server localhost:9092
# ====== 消费组问题排查 ======
# 检查是否有消费者不稳定导致频繁 Rebalance
bin/kafka-consumer-groups.sh --describe --group order-service-group \
--bootstrap-server localhost:9092
# 手动移除不活跃的消费者
bin/kafka-consumer-groups.sh --group order-service-group \
--bootstrap-server localhost:9092 --reset-offsets --execute \
--to-latest --all-topics常见排障维度:
- Producer 发不进去:网络、Topic 不存在、acks/权限问题
- Consumer 收不到:group、offset、topic、反序列化问题
- 消费积压:处理太慢、分区太少、消息峰值太高
- 重复消费:提交 offset 时机不合理监控指标
# ====== JMX 监控配置 ======
# 在启动参数中添加 JMX 端口
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.rmi.port=9999 \
-Dcom.sun.management.jmxremote.local.only=false \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false"
bin/kafka-server-start.sh -daemon config/kraft/server.properties
# ====== 关键监控指标 ======
# 1. BytesInPerSec / BytesOutPerSec - 吞吐量
# 2. MessagesInPerSec - 消息速率
# 3. TotalLogSize - 日志大小
# 4. NumLogSegments - 日志段数量
# 5. UnderReplicatedPartitions - 欠副本分区数(应始终为 0)
# 6. OfflinePartitions - 离线分区数(应始终为 0)
# 7. ActiveControllerCount - 活跃 Controller 数量
# 8. RequestQueueSize - 请求队列大小
# 9. NetworkProcessorAvgIdlePercent - 网络 IO 空闲率
# 10. Consumer Group Lag - 消费延迟
# ====== 使用 kafka-consumer-groups 监控 Lag ======
# 编写监控脚本
#!/bin/bash
while true; do
echo "=== $(date) ==="
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--all-groups --all-topics --describe 2>/dev/null | \
awk 'NR>1 {lag+=$5} END {print "Total Lag: " lag}'
sleep 30
done优点
缺点
总结
Kafka 更适合"事件流"和"高吞吐异步链路",而不是简单把它当成"另一个消息队列"。真正落地时,最关键的是 Topic 设计、分区策略、消费组模型、Offset 提交方式和失败处理策略,这些才决定了系统是否稳定可维护。
关键知识点
- Topic 是逻辑分类,Partition 决定并发和有序性边界。
- Offset 管理决定重复消费和丢消息风险。
- Consumer Group 决定消息是广播还是组内分摊。
- Kafka 的优势在流式事件链路,而不是所有异步场景通杀。
- KRaft 模式(Kafka 3.0+)取代 ZooKeeper,简化部署和运维。
- min.insync.replicas 决定数据可靠性上限。
- acks=all + min.insync.replicas >= 2 实现高可靠生产。
项目落地视角
- 订单、支付、库存等领域事件适合用 Kafka 做解耦和广播。
- 日志采集、埋点流水、CDC 同步非常适合 Kafka。
- 对金融、库存类业务,消费者幂等设计比"会不会消费"更重要。
- 大流量场景要把 Lag、分区、消费耗时纳入常态监控。
常见误区
- 只会创建 Topic,不会设计事件模型和消费边界。
- 消费成功前就提交 offset,导致消息丢失风险。
- 认为分区越多越好,忽略分区治理和运维成本。
- 把 Kafka 当同步调用替代品使用,结果复杂度徒增。
- 不配置 min.insync.replicas 导致副本数据不一致。
- 消费者处理时间过长导致频繁 Rebalance。
- 不清理过期的消费组 Offset 占用大量磁盘空间。
进阶路线
- 深入学习 KRaft、ISR、Leader 选举与副本同步机制。
- 学习 Schema Registry、Avro / Protobuf 等序列化治理。
- 研究 Kafka Connect、CDC、流式 ETL 与 Flink / Spark 集成。
- 学习 Exactly-Once、事务消息和幂等消费设计。
适用场景
- 事件驱动架构。
- 微服务异步解耦。
- 日志与埋点数据管道。
- 实时分析与流式处理。
落地建议
- 先设计事件模型,再建 Topic,不要反过来。
- 明确每个消费者的幂等策略、重试策略和死信策略。
- 把消费积压和 offset lag 纳入监控看板。
- 对关键 Topic 统一约定命名规范、字段结构和版本演进方式。
- 生产环境建议 replication-factor >= 2,min.insync.replicas >= 1。
排错清单
- 发不出消息:检查 broker、topic、认证和序列化。
- 收不到消息:检查消费组、offset 起点、topic 名和订阅逻辑。
- 消费重复:检查 offset 提交时机和幂等处理。
- 积压严重:检查分区数、消费耗时、下游依赖和峰值流量。
复盘问题
- 当前业务真的适合 Kafka,还是更适合简单消息队列?
- 你的 Topic 是按业务事件划分,还是按临时代码模块划分?
- 消费失败后,你们的系统会重试、跳过还是阻塞?
- 如果消息堆积 10 倍,现有消费模型还能撑住吗?
