RabbitMQ
大约 16 分钟约 4855 字
RabbitMQ
简介
RabbitMQ 是一个经典的消息中间件,最擅长做可靠消息投递、灵活路由、削峰填谷和系统解耦。相比 Kafka 更偏事件流和高吞吐,RabbitMQ 更适合任务分发、业务异步通知、延迟队列、死信处理和精细消息路由场景。真正落地时,重点不是装起来,而是把交换机、队列、路由键、确认机制、死信策略和消费失败处理一起设计清楚。RabbitMQ 基于 AMQP(Advanced Message Queuing Protocol)协议实现,由 Erlang 语言编写,具有高可用性、高可靠性和可扩展性。它最初由 RabbitMQ Ltd 在 2007 年发布,目前由 VMware/Broadcom 维护。
特点
实现
核心概念
RabbitMQ 常见概念:
- Producer:消息生产者
- Consumer:消息消费者
- Queue:消息存储缓冲区
- Exchange:消息路由器
- Binding:Exchange 和 Queue 的绑定规则
- Routing Key:消息路由键
- Channel:连接内的轻量通道
- Virtual Host:多业务隔离的逻辑命名空间消息流转:
Producer -> Exchange -> Queue -> Consumer核心概念详解
====== 连接与通道 ======
Connection:TCP 长连接,开销大,建议复用
Channel:Connection 内的多路复用通道,轻量级
- 一个 Connection 可以包含多个 Channel
- 每个线程建议使用独立的 Channel
- Channel 不是线程安全的
====== Exchange 交换机 ======
Exchange 接收 Producer 发送的消息,根据路由规则将消息路由到一个或多个 Queue。
Exchange 本身不存储消息。
====== Queue 队列 ======
Queue 是消息的存储缓冲区,遵循 FIFO 原则。
- 持久化队列:服务器重启后仍然存在
- 临时队列:连接关闭后自动删除
====== Binding 绑定 ======
Binding 是 Exchange 和 Queue 之间的关联关系,包含一个 Binding Key。
当消息的 Routing Key 匹配 Binding Key 时,消息被路由到对应的 Queue。
====== Virtual Host ======
Virtual Host 是逻辑上的消息服务器隔离。
- 不同的 vhost 之间完全隔离
- 适合多租户、多环境隔离
- 类似 MySQL 的 database 概念
====== Message 消息 ======
Message 包含两部分:
- Body:消息体(实际数据)
- Properties:消息属性(Content-Type、Priority、Expiration、MessageId 等)交换机类型与适用场景
direct:
- 精确匹配 routing key
- 适合明确目标队列场景
fanout:
- 广播到所有绑定队列
- 适合通知、广播、配置刷新
topic:
- 支持通配符匹配
- 适合按业务域、事件类型做更灵活路由
headers:
- 按 header 匹配
- 相对少用交换机类型详解
====== direct 交换机 ======
- Routing Key 必须与 Binding Key 完全匹配
- 适用场景:点对点消息、任务分发
- 示例:
Exchange: order.exchange (direct)
Binding: order.exchange -> order.queue (routing key: order.create)
发送消息 routing key = "order.create" -> 匹配成功
发送消息 routing key = "order.cancel" -> 不匹配
====== fanout 交换机 ======
- 忽略 Routing Key,广播到所有绑定的 Queue
- 适用场景:日志广播、配置变更通知
- 示例:
Exchange: log.exchange (fanout)
Binding: log.exchange -> log.file.queue
Binding: log.exchange -> log.db.queue
发送任何消息 -> 同时路由到两个 Queue
====== topic 交换机 ======
- Routing Key 支持通配符
- * 匹配一个单词,# 匹配零个或多个单词
- 单词以 . 分隔
- 适用场景:事件驱动、多业务域路由
- 示例:
Exchange: event.exchange (topic)
Binding: event.exchange -> order.queue (routing key: order.*)
Binding: event.exchange -> all.queue (routing key: #)
发送 "order.created" -> 匹配 order.* 和 #
发送 "user.registered" -> 只匹配 #
====== headers 交换机 ======
- 根据消息的 headers 属性匹配
- x-match: all(所有 header 都匹配)或 any(任一 header 匹配)
- 适用场景:复杂条件路由(较少使用)Linux 安装与管理(现代化示意)
# 安装 Erlang 仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | bash
# 安装 Erlang
yum install -y erlang完整安装指南
# ====== 方法 1:使用官方仓库安装(推荐) ======
# 安装 Erlang 依赖
yum install -y epel-release
yum install -y socat logrotate
# 安装 Erlang 仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | bash
# 安装指定版本的 Erlang(避免版本不兼容)
yum install -y erlang-25.3.2
# 安装 RabbitMQ 仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | bash
# 安装 RabbitMQ(指定版本)
yum install -y rabbitmq-server-3.12.10
# ====== 方法 2:下载 RPM 包离线安装 ======
# 下载地址:https://github.com/rabbitmq/rabbitmq-server/releases
# 下载地址:https://packagecloud.io/rabbitmq/rabbitmq-server
# 安装 Erlang RPM
yum install -y erlang-25.3.2-1.el7.x86_64.rpm
# 安装 RabbitMQ RPM
yum install -y rabbitmq-server-3.12.10-1.el7.noarch.rpm
# ====== 方法 3:使用 EPEL 仓库安装(版本可能较旧) ======
yum install -y epel-release
yum install -y rabbitmq-server# 启动与开机自启
systemctl enable --now rabbitmq-server
systemctl status rabbitmq-server服务管理
# ====== systemd 服务管理 ======
systemctl start rabbitmq-server # 启动
systemctl stop rabbitmq-server # 停止
systemctl restart rabbitmq-server # 重启
systemctl status rabbitmq-server # 状态
systemctl reload rabbitmq-server # 重新加载配置
# 查看服务日志
journalctl -u rabbitmq-server -f
journalctl -u rabbitmq-server --since "2024-01-01"
# ====== rabbitmqctl 管理 ======
rabbitmqctl status # 查看服务状态
rabbitmqctl status | grep -A5 "Listeners" # 查看监听端口
rabbitmqctl environment # 查看环境变量
rabbitmqctl report # 生成详细报告
# 停止应用(保留 Erlang VM)
rabbitmqctl stop_app
# 启动应用
rabbitmqctl start_app
# 完全停止(包括 Erlang VM)
rabbitmqctl stop
# ====== 配置文件位置 ======
# 主配置文件
/etc/rabbitmq/rabbitmq.conf
# 高级配置(Erlang 格式)
/etc/rabbitmq/advanced.config
# 环境配置
/etc/rabbitmq/rabbitmq-env.conf配置文件详解
# ====== /etc/rabbitmq/rabbitmq.conf ======
# 新格式配置文件(推荐)
# 监听端口
listeners.tcp.default = 5672
management.tcp.port = 15672
# 日志配置
log.console.level = info
log.console.file = /var/log/rabbitmq/rabbitmq.log
log.file.level = info
log.file.path = /var/log/rabbitmq/rabbitmq.log
log.file.rotation.date = $D0
log.file.rotation.size = 52428800 # 50MB
# 内存水位线(默认 0.4,即 40% 内存)
vm_memory_high_watermark.relative = 0.6
# 磁盘水位线
disk_free_limit.absolute = 5GB
# 默认用户(建议创建后禁用 guest)
default_user = admin
default_pass = StrongPass123!
# 集群配置
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3
# ====== /etc/rabbitmq/rabbitmq-env.conf ======
# 环境变量配置
RABBITMQ_NODENAME=rabbit@$(hostname -s)
RABBITMQ_NODE_PORT=5672
RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia
RABBITMQ_LOG_BASE=/var/log/rabbitmq
# ====== /etc/rabbitmq/advanced.config ======
# Erlang 格式高级配置
[
{rabbit, [
{default_user, <<"admin">>},
{default_pass, <<"StrongPass123!">>},
{heartbeat, 60},
{tcp_listen_options, [{backlog, 128}, {nodelay, true}]}
]}
].# 启用管理控制台插件
rabbitmq-plugins enable rabbitmq_management
# 默认控制台地址
# http://<host>:15672插件管理
# ====== 查看所有可用插件 ======
rabbitmq-plugins list
# ====== 启用/禁用插件 ======
rabbitmq-plugins enable rabbitmq_management # 管理控制台
rabbitmq-plugins enable rabbitmq_shovel # 跨集群消息搬运
rabbitmq-plugins enable rabbitmq_shovel_management # Shovel 管理界面
rabbitmq-plugins enable rabbitmq_federation # 联邦插件
rabbitmq-plugins enable rabbitmq_federation_management
rabbitmq-plugins enable rabbitmq_prometheus # Prometheus 监控
rabbitmq-plugins enable rabbitmq_auth_mechanism_ssl # SSL 认证
rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 延迟消息插件
# 禁用插件
rabbitmq-plugins disable rabbitmq_management
# 查看已启用插件
rabbitmq-plugins list --enabled# 防火墙放行
firewall-cmd --add-port=5672/tcp --permanent
firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --reload防火墙与安全配置
# ====== 防火墙配置 ======
# AMQP 协议端口
firewall-cmd --add-port=5672/tcp --permanent
# 管理控制台端口
firewall-cmd --add-port=15672/tcp --permanent
# 节点间通信端口(集群使用)
firewall-cmd --add-port=25672/tcp --permanent
# Erlang 分布式通信端口(EPMD)
firewall-cmd --add-port=4369/tcp --permanent
firewall-cmd --reload
# 限制来源 IP(生产环境推荐)
firewall-cmd --permanent --zone=public --add-rich-rule='rule family="ipv4" source address="10.0.0.0/8" port protocol="tcp" port="5672" accept'
firewall-cmd --permanent --zone=public --add-rich-rule='rule family="ipv4" source address="192.168.1.0/24" port protocol="tcp" port="15672" accept'
firewall-cmd --reload
# ====== TLS/SSL 加密通信 ======
# 生成证书(使用 OpenSSL)
mkdir -p /etc/rabbitmq/ssl
cd /etc/rabbitmq/ssl
# 生成 CA 证书
openssl genrsa -out ca.key 2048
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt
# 生成服务器证书
openssl genrsa -out server.key 2048
openssl req -new -key server.key -out server.csr
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -sha256
# 配置 TLS
# 在 rabbitmq.conf 中添加:
# listeners.ssl.default = 5671
# ssl_options.cacertfile = /etc/rabbitmq/ssl/ca.crt
# ssl_options.certfile = /etc/rabbitmq/ssl/server.crt
# ssl_options.keyfile = /etc/rabbitmq/ssl/server.key
# ssl_options.verify = verify_peer
# ssl_options.fail_if_no_peer_cert = false用户、权限与 vhost
# 创建管理员账户
rabbitmqctl add_user admin StrongPass123!
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# 查看用户
rabbitmqctl list_users用户与权限管理详解
# ====== 用户管理 ======
# 创建用户
rabbitmqctl add_user deploy DeployPass2024!
rabbitmqctl add_user monitor MonitorPass2024!
rabbitmqctl add_user app_app AppPass2024!
# 修改密码
rabbitmqctl change_password deploy NewDeployPass2024!
# 删除用户
rabbitmqctl delete_user guest # 删除默认 guest 用户
# 查看用户列表
rabbitmqctl list_users
# user tags
# admin [administrator]
# deploy [monitoring]
# app_app []
# ====== 用户标签 ======
# management - 可访问管理控制台
# monitoring - 可访问管理控制台和监控信息
# policymaker - 可创建/删除策略和参数
# administrator - 完整管理权限
# 无标签 - 普通用户,只能访问被授权的资源
# 设置用户标签
rabbitmqctl set_user_tags deploy monitoring
rabbitmqctl set_user_tags app_app "" # 清除标签
# ====== 权限管理 ======
# 权限格式:configure write read(正则表达式)
# configure: 配置队列和交换机的权限
# write: 发布消息到交换机的权限
# read: 从队列消费消息的权限
# 设置完全权限
rabbitmqctl set_permissions -p / deploy ".*" ".*" ".*"
# 设置只读权限
rabbitmqctl set_permissions -p / monitor "" "" ".*"
# 设置只写权限(生产者)
rabbitmqctl set_permissions -p / app_app "" "app_.*" ""
# 设置特定前缀权限
rabbitmqctl set_permissions -p / app_app "app_.*" "app_.*" "app_.*"
# 查看权限
rabbitmqctl list_permissions -p /
rabbitmqctl list_user_permissions deploy
# 清除权限
rabbitmqctl clear_permissions -p / deploy
# ====== Virtual Host 管理 ======
# 创建 vhost
rabbitmqctl add_vhost order_prod
rabbitmqctl add_vhost order_staging
rabbitmqctl add_vhost user_prod
# 删除 vhost
rabbitmqctl delete_vhost order_staging
# 列出 vhost
rabbitmqctl list_vhosts
# 设置 vhost 权限
rabbitmqctl set_permissions -p order_prod admin ".*" ".*" ".*"
rabbitmqctl set_permissions -p order_prod deploy "^order\..*$" "^order\..*$" "^order\..*$"
# 列出 vhost 权限
rabbitmqctl list_permissions -p order_prod# 创建业务 vhost,做环境或业务隔离
rabbitmqctl add_vhost order-prod
rabbitmqctl set_permissions -p order-prod admin ".*" ".*" ".*"生产建议:
- 不要长期使用默认 guest/guest
- 不同业务 / 环境优先做 vhost 隔离
- 账号权限最小化,不要所有应用都给 administrator常用命令与运维排障
# 查看队列
rabbitmqctl list_queues name messages consumers
# 查看交换机
rabbitmqctl list_exchanges name type durable
# 查看绑定关系
rabbitmqctl list_bindings
# 清空某个队列
rabbitmqctl purge_queue queue_name运维命令详解
# ====== 队列管理 ======
# 查看所有队列
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged consumers
# 查看持久化队列
rabbitmqctl list_queues name durable auto_delete
# 查看队列内存占用
rabbitmqctl list_queues name memory
# 清空队列消息
rabbitmqctl purge_queue order.created.queue
# 删除队列
rabbitmqctl delete_queue old.queue
# ====== 交换机管理 ======
# 查看所有交换机
rabbitmqctl list_exchanges name type durable internal
# ====== 绑定关系管理 ======
# 查看所有绑定
rabbitmqctl list_bindings source_name destination_name routing_key
# ====== 连接管理 ======
# 查看所有连接
rabbitmqctl list_connections
# 查看连接详情
rabbitmqctl list_connections name state channels timeout
# 关闭连接(优雅关闭)
rabbitmqctl close_connection "connection_name" "Closing for maintenance"
# 关闭所有连接(维护时使用)
rabbitmqctl list_connections -q | awk '{print $1}' | while read conn; do
rabbitmqctl close_connection "$conn" "Maintenance"
done
# ====== Channel 管理 ======
rabbitmqctl list_channels connection name state consumer_count
# ====== 消费者管理 ======
rabbitmqctl list_consumers queue_name channel_name consumer_tag ack_required
# ====== 集群管理 ======
rabbitmqctl cluster_status
# 加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 离开集群
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
# 设置集群节点类型
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all","ha-sync-mode":"automatic"}' --apply-to queues
# ====== 策略管理 ======
rabbitmqctl list_policies
# 查看服务器状态
rabbitmqctl status
rabbitmqctl node_health_check
# 查看内存使用
rabbitmqctl eval 'rabbit_diagnostics:used_memory().'
# 查看进程信息
rabbitmqctl eval 'erlang:process_info(self()).'# 查看集群状态
rabbitmqctl cluster_status
# 查看策略
rabbitmqctl list_policies排障时最常看的不是"服务起没起",而是:
- 消息有没有堆积
- 消费者是不是挂了
- 路由是不是走错了
- ACK 是否正常排障指南
# ====== 消息堆积排查 ======
# 查看队列消息数
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged consumers
# messages_ready: 等待被消费的消息数
# messages_unacknowledged: 已发送给消费者但未收到 ACK 的消息数
# consumers: 当前连接的消费者数量
# 如果 messages_ready 持续增长:
# 1. 检查消费者是否存活
# 2. 检查消费者处理速度
# 3. 检查下游依赖是否正常
# 4. 考虑增加消费者实例
# 如果 messages_unacknowledged 持续增长:
# 1. 消费者处理超时
# 2. 消费者处理失败未 ACK
# 3. 消费者网络中断
# ====== 消息丢失排查 ======
# 1. 确认交换机和队列是否持久化
rabbitmqctl list_exchanges name type durable
rabbitmqctl list_queues name durable
# 2. 确认消息发送是否成功(Publisher Confirm)
# 3. 检查是否有 mandatory 路由失败
# 4. 检查死信队列中是否有消息
# ====== 连接问题排查 ======
# 查看连接状态
rabbitmqctl list_connections name state recv_oct send_oct
# state 应为 running
# recv_oct/send_oct 应在增长
# 查看认证失败日志
grep "authentication_failure" /var/log/rabbitmq/rabbitmq.log
# ====== 性能排查 ======
# 查看队列内存占用
rabbitmqctl list_queues name memory messages
# 查看文件描述符使用
rabbitmqctl status | grep file_descriptors
# 增加文件描述符限制
echo "rabbitmq soft nofile 65536" >> /etc/security/limits.conf
echo "rabbitmq hard nofile 65536" >> /etc/security/limits.conf
# ====== 磁盘空间告警 ======
# RabbitMQ 在磁盘空间不足时会阻塞消息发布
# 查看磁盘水位线设置
rabbitmqctl status | grep disk_free_limit
# 查看实际磁盘空间
df -h /var/lib/rabbitmq
# 修改磁盘水位线
# 在 rabbitmq.conf 中设置:
# disk_free_limit.absolute = 5GB典型路由与死信思路
订单场景示意:
- exchange: order.events
- routing key: order.created / order.paid / order.cancelled
- queue:
- order.inventory
- order.notification
- order.analytics死信与重试常见设计:
- 主队列消费失败
- 进入死信交换机
- 路由到重试队列 / 死信队列
- 延迟后再次回到主队列,或人工处理死信队列与延迟队列配置
# ====== 死信队列配置 ======
# 死信队列(DLQ)设计思路:
# 1. 创建死信交换机(DLX)
# 2. 创建死信队列
# 3. 将死信队列绑定到死信交换机
# 4. 在主队列上设置 x-dead-letter-exchange 参数
# 通过策略设置死信(推荐)
rabbitmqctl set_policy DLX "^order\." \
'{"dead-letter-exchange":"order.dlx"}' \
--apply-to queues
# 通过命令行创建死信队列
rabbitmqadmin declare exchange name=order.dlx type=direct durable=true
rabbitmqadmin declare queue name=order.dlq durable=true
rabbitmqadmin declare binding source=order.dlx destination=order.dlq routing_key="#"
# ====== 延迟队列配置 ======
# 方法 1:使用延迟消息插件(推荐)
# 安装延迟消息插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 创建延迟交换机
rabbitmqadmin declare exchange name=order.delayed type=x-delayed-message \
durable=true arguments='{"x-delayed-type":"direct"}'
# 方法 2:使用 TTL + 死信实现延迟
# 创建带 TTL 的临时队列,过期后消息路由到死信队列
rabbitmqadmin declare queue name=order.retry.30s durable=true \
arguments='{"x-dead-letter-exchange":"order.events","x-message-ttl":30000,"x-dead-letter-routing-key":"order.retry"}'
# ====== 优先级队列 ======
rabbitmqadmin declare queue name=order.priority durable=true \
arguments='{"x-max-priority":10}'
# ====== 消息 TTL 配置 ======
# 队列级别 TTL(所有消息统一过期时间)
rabbitmqadmin declare queue name=order.ttl durable=true \
arguments='{"x-message-ttl":60000}' # 60 秒
# 消息级别 TTL(在发送消息时设置)
# 通过 properties 设置 expiration
# ====== 队列长度限制 ======
rabbitmqadmin declare queue name=order.limited durable=true \
arguments='{"x-max-length":10000,"x-overflow":"reject-publish"}'
# overflow 选项:
# reject-publish: 丢弃新消息(默认)
# reject-publish-dlx: 丢弃新消息到死信队列
# drop-head: 丢弃队列头部的消息集群配置
# ====== 标准集群(镜像队列) ======
# 节点 1 配置
cat > /etc/rabbitmq/rabbitmq.conf << 'EOF'
listeners.tcp.default = 5672
management.tcp.port = 15672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
cluster_formation.classic_config.nodes.3 = rabbit@node3
EOF
# 设置 Erlang Cookie(所有节点必须相同)
cat > /var/lib/rabbitmq/.erlang.cookie << 'EOF'
MYSECRETCOOKIE123456
EOF
chmod 600 /var/lib/rabbitmq/.erlang.cookie
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
# 在节点 2 和节点 3 上加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 设置高可用策略(所有队列在所有节点上都有副本)
rabbitmqctl set_policy ha-all ".*" \
'{"ha-mode":"all","ha-sync-mode":"automatic"}' \
--apply-to queues
# 查看集群状态
rabbitmqctl cluster_status
# ====== Quorum Queue(RabbitMQ 3.8+,推荐替代镜像队列) ======
# Quorum Queue 使用 Raft 共识算法,数据一致性更强
# 创建 Quorum Queue
rabbitmqadmin declare queue name=order.qq type=quorum durable=true
# 通过策略自动将普通队列升级
rabbitmqctl set_policy quorum "^qq\." \
'{"quorum-initial-group-size":3}' \
--apply-to queues --queues-type quorum优点
缺点
总结
RabbitMQ 最适合的是"精细业务消息路由"和"可靠异步处理",而不是把它简单看作"另一个中间件"。真正落地时,最关键的是明确消息模型、路由结构、确认机制、失败补偿和队列隔离,否则消息系统会很快从解耦工具变成排障黑洞。
关键知识点
- Exchange 决定消息怎么分发,Queue 决定消息存在哪。
- ACK 与重试策略直接决定消息可靠性。
- RabbitMQ 很适合业务任务队列,不一定适合超大事件流。
- 死信、延迟、重试必须从业务角度先设计,而不是上线后再补。
- 生产者确认(Publisher Confirm)保证消息到达服务器。
- 消费者 ACK 保证消息被正确处理。
- 持久化(durable)保证服务器重启后数据不丢失。
- Quorum Queue 是镜像队列的升级方案,推荐新项目使用。
项目落地视角
- 订单创建后,库存、通知、分析可通过不同队列异步处理。
- 后台任务分发、短信发送、邮件发送都常用 RabbitMQ。
- 秒杀、削峰类场景也可用 RabbitMQ 做异步缓冲。
- 真正生产系统里要建立队列积压、消费者数量、投递失败率监控。
- 建议使用 vhost 隔离不同环境和业务。
- 关键业务消息建议开启 Publisher Confirm 和手动 ACK。
常见误区
- 只建一个队列处理所有业务消息。
- 不做 ACK / 死信策略,消费失败后消息行为不可控。
- 所有业务共用一个 vhost、一个大管理员账号。
- 出现堆积只会重启服务,不分析消费速度和消息路由。
- 忘记设置队列和交换机的持久化(durable=true)。
- 使用自动 ACK 而非手动 ACK,消费失败消息会丢失。
- 不限制队列长度,消息无限堆积导致内存溢出。
- 镜像队列未同步完成就宕机,导致数据丢失。
进阶路线
- 深入学习确认机制、事务、Publisher Confirm 和消费者幂等。
- 学习延迟队列、死信重试、优先级队列和 Quorum Queue。
- 研究 RabbitMQ 集群、镜像队列和网络分区处理。
- 与 ASP.NET Core MassTransit、Outbox 等模式结合使用。
- 学习 RabbitMQ Shovel 实现跨集群消息同步。
- 研究 Prometheus + Grafana 监控 RabbitMQ 指标。
适用场景
- 异步任务队列。
- 订单、通知、邮件、短信等业务异步处理。
- 削峰填谷与缓冲场景。
- 需要精细消息路由和可靠投递的业务系统。
落地建议
- 先设计业务事件和路由,再定义 Exchange / Queue。
- 每个队列都明确 ACK、重试、死信策略。
- 通过 vhost、账号权限、命名规范做环境和业务隔离。
- 把积压、死信、连接数、消费者存活纳入监控。
- 队列和交换机都设置 durable=true,消息设置 persistent=true。
- 生产环境建议关闭 guest 用户,创建专用账号。
排错清单
- 发不进去:检查交换机、权限、连接和 routing key。
- 收不到:检查绑定关系、队列名、消费者订阅和 ACK 状态。
- 大量堆积:检查消费者速度、下游依赖和消息量峰值。
- 死信过多:检查业务异常、重试逻辑和消息格式。
- 连接断开:检查网络、防火墙、心跳超时和文件描述符限制。
- 内存告警:检查队列消息数、消费者处理速度和 vm_memory_high_watermark。
