MongoDB 高级应用
大约 10 分钟约 3088 字
MongoDB 高级应用
简介
MongoDB 是文档型 NoSQL 数据库,以灵活的文档模型和水平扩展能力著称。高级应用涵盖聚合管道、索引策略、分片集群、事务支持和 Change Stream。适用于内容管理、IoT 数据、实时分析等场景。
特点
聚合管道
常用阶段
// MongoDB 聚合管道示例
// 1. 统计每个分类的商品数量和平均价格
db.products.aggregate([
{ $match: { status: "active" } },
{ $group: {
_id: "$category",
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
maxPrice: { $max: "$price" },
totalRevenue: { $sum: "$salesAmount" }
}},
{ $sort: { count: -1 } },
{ $limit: 10 }
])
// 2. 订单统计 — 按月分组
db.orders.aggregate([
{ $match: { createdAt: { $gte: ISODate("2026-01-01") } } },
{ $group: {
_id: { $dateToString: { format: "%Y-%m", date: "$createdAt" } },
orderCount: { $sum: 1 },
totalAmount: { $sum: "$totalAmount" },
avgAmount: { $avg: "$totalAmount" }
}},
{ $sort: { _id: 1 } }
])
// 3. 多表关联 — Lookup
db.orders.aggregate([
{ $lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}},
{ $unwind: "$user" },
{ $lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "productDetails"
}},
{ $project: {
orderNo: 1,
totalAmount: 1,
userName: "$user.name",
productCount: { $size: "$items" }
}}
])
// 4. 分桶统计 — 价格区间
db.products.aggregate([
{ $bucket: {
groupBy: "$price",
boundaries: [0, 50, 100, 200, 500, 1000, Infinity],
default: "other",
output: {
count: { $sum: 1 },
names: { $push: "$name" }
}
}}
])索引策略
索引类型
// 1. 单字段索引
db.orders.createIndex({ userId: 1 })
// 2. 复合索引(ESR 原则:Equality, Sort, Range)
db.orders.createIndex({ userId: 1, status: 1, createdAt: -1 })
// 3. 文本索引
db.articles.createIndex({ title: "text", content: "text" })
db.articles.find({ $text: { $search: "MongoDB 性能优化" } })
// 4. 地理空间索引
db.locations.createIndex({ position: "2dsphere" })
db.locations.find({
position: {
$near: { $geometry: { type: "Point", coordinates: [113.93, 22.53] }, $maxDistance: 5000 }
}
})
// 5. TTL 索引 — 自动过期删除
db.logs.createIndex({ createdAt: 1 }, { expireAfterSeconds: 86400 }) // 1天后自动删除
// 6. 哈希索引 — 分片键
db.users.createIndex({ userId: "hashed" })
// 查看索引
db.orders.getIndexes()
// 分析查询
db.orders.find({ userId: 1001 }).explain("executionStats").NET 集成
MongoDB C# Driver
dotnet add package MongoDB.Driver/// <summary>
/// MongoDB .NET 操作
/// </summary>
public class MongoDbContext
{
private readonly IMongoDatabase _database;
public MongoDbContext(IConfiguration config)
{
var client = new MongoClient(config.GetConnectionString("MongoDB"));
_database = client.GetDatabase("MyApp");
}
public IMongoCollection<T> GetCollection<T>(string? name = null)
{
return _database.GetCollection<T>(name ?? typeof(T).Name);
}
}
// Repository
public class ProductRepository
{
private readonly IMongoCollection<Product> _collection;
public ProductRepository(MongoDbContext context)
{
_collection = context.GetCollection<Product>("Products");
}
// CRUD
public async Task<List<Product>> GetAllAsync(int page, int pageSize)
{
return await _collection
.Find(p => p.IsActive)
.SortByDescending(p => p.CreatedAt)
.Skip((page - 1) * pageSize)
.Limit(pageSize)
.ToListAsync();
}
public async Task<Product?> GetByIdAsync(string id)
{
var filter = Builders<Product>.Filter.Eq(p => p.Id, id);
return await _collection.Find(filter).FirstOrDefaultAsync();
}
public async Task CreateAsync(Product product)
{
await _collection.InsertOneAsync(product);
}
public async Task UpdateAsync(string id, Product product)
{
var filter = Builders<Product>.Filter.Eq(p => p.Id, id);
await _collection.ReplaceOneAsync(filter, product);
}
public async Task DeleteAsync(string id)
{
var filter = Builders<Product>.Filter.Eq(p => p.Id, id);
await _collection.DeleteOneAsync(filter);
}
// 聚合查询
public async Task<List<CategoryStats>> GetCategoryStatsAsync()
{
return await _collection.Aggregate()
.Match(p => p.IsActive)
.Group(p => p.Category, g => new CategoryStats
{
Category = g.Key,
Count = g.Count(),
AvgPrice = g.Average(p => p.Price),
MaxPrice = g.Max(p => p.Price)
})
.SortByDescending(s => s.Count)
.Limit(10)
.ToListAsync();
}
}
// 文档模型
public class Product
{
[BsonId]
[BsonRepresentation(BsonType.ObjectId)]
public string Id { get; set; } = null!;
public string Name { get; set; } = "";
public string Category { get; set; } = "";
public decimal Price { get; set; }
public bool IsActive { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
public class CategoryStats
{
public string Category { get; set; } = "";
public int Count { get; set; }
public double AvgPrice { get; set; }
public decimal MaxPrice { get; set; }
}事务支持
多文档事务
/// <summary>
/// MongoDB 多文档事务(副本集模式)
/// </summary>
public class OrderService
{
private readonly IMongoClient _client;
public OrderService(IMongoClient client)
{
_client = client;
}
public async Task CreateOrderWithItemsAsync(Order order, List<OrderItem> items)
{
using var session = await _client.StartSessionAsync();
try
{
session.StartTransaction();
var db = _client.GetDatabase("MyApp");
var orders = db.GetCollection<Order>("Orders");
var orderItems = db.GetCollection<OrderItem>("OrderItems");
await orders.InsertOneAsync(session, order);
await orderItems.InsertManyAsync(session, items);
await session.CommitTransactionAsync();
}
catch
{
await session.AbortTransactionAsync();
throw;
}
}
}Change Stream
实时数据变更监听
/// <summary>
/// Change Stream — 监听数据变更
/// </summary>
public class OrderChangeStreamService : BackgroundService
{
private readonly IMongoCollection<Order> _collection;
private readonly ILogger _logger;
public OrderChangeStreamService(IMongoDatabase database, ILogger<OrderChangeStreamService> logger)
{
_collection = database.GetCollection<Order>("Orders");
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Order>>();
using var cursor = await _collection.WatchAsync(pipeline, cancellationToken: stoppingToken);
await cursor.ForEachAsync(change =>
{
switch (change.OperationType)
{
case ChangeStreamOperationType.Insert:
_logger.LogInformation("新订单:{Id}", change.FullDocument.Id);
break;
case ChangeStreamOperationType.Update:
_logger.LogInformation("订单更新:{Id}", change.DocumentKey);
break;
case ChangeStreamOperationType.Delete:
_logger.LogInformation("订单删除:{Id}", change.DocumentKey);
break;
}
}, stoppingToken);
}
}优点
缺点
总结
MongoDB 适合文档型数据、快速迭代的项目。聚合管道替代复杂 SQL,Change Stream 实现实时通知,TTL 索引自动清理过期数据。与关系数据库互补使用:结构化数据用 MySQL,文档/非结构化数据用 MongoDB。
关键知识点
- 数据库主题一定要同时看数据模型、读写模式和执行代价。
- 很多性能问题不是 SQL 语法问题,而是索引、统计信息、事务和数据分布问题。
- 高可用、备份、迁移和治理与查询优化同样重要。
- 先把数据模型、访问模式和执行代价绑定起来理解。
项目落地视角
- 所有优化前后都保留执行计划、样本 SQL 和关键指标对比。
- 上线前准备回滚脚本、备份点和校验方案。
- 把连接池、锁等待、慢查询和容量增长纳入日常巡检。
- 保留执行计划、样本 SQL、索引定义和优化前后指标。
常见误区
- 脱离真实数据分布讨论索引或分片。
- 只看单条 SQL,不看整条业务链路的事务和锁。
- 把测试环境结论直接等同于生产环境结论。
- 脱离真实数据分布设计索引。
进阶路线
- 继续向执行计划、存储引擎、复制机制和数据治理层深入。
- 把主题与 ORM、缓存、消息队列和归档策略联动起来思考。
- 沉淀成数据库设计规范、SQL 审核规则和变更流程。
- 继续深入存储引擎、复制机制、归档与冷热分层治理。
适用场景
- 当你准备把《MongoDB 高级应用》真正落到项目里时,最适合先在一个独立模块或最小样例里验证关键路径。
- 适合数据建模、查询优化、事务控制、高可用和迁移治理。
- 当系统开始遇到慢查询、锁冲突、热点数据或容量增长时,这类主题价值最高。
落地建议
- 先分析真实查询模式、数据量级和写入特征,再决定索引或分片策略。
- 所有优化结论都结合执行计划、样本数据和监控指标验证。
- 高风险操作前准备备份、回滚脚本与校验 SQL。
排错清单
- 先确认瓶颈在 CPU、I/O、锁等待、网络还是 SQL 本身。
- 检查执行计划是否走错索引、是否发生排序或全表扫描。
- 排查长事务、隐式类型转换、统计信息过期和参数嗅探。
复盘问题
- 如果把《MongoDB 高级应用》放进你的当前项目,最先要验证的输入、输出和失败路径分别是什么?
- 《MongoDB 高级应用》最容易在什么规模、什么边界条件下暴露问题?你会用什么指标或日志去确认?
- 相比默认实现或替代方案,采用《MongoDB 高级应用》最大的收益和代价分别是什么?
分片集群设计
// 分片(Sharding)— 水平扩展的核心机制
// 分片键的选择原则:
// 1. 高基数(cardinality 高)— 分片键值域足够大
// 2. 写入分散 — 避免热点(不要用自增 ID)
// 3. 查询友好 — 大部分查询能通过分片键定位
// 启用分片
sh.enableSharding("MyApp")
sh.shardCollection("MyApp.orders", { userId: "hashed" })
// 分片策略对比:
// 1. 范围分片(Range)
// - 按分片键值的范围分配数据
// - 适合范围查询(userId > 1000 AND userId < 2000)
// - 缺点:写入热点(自增 ID 会导致所有写入集中在最后一个分片)
// 2. 哈希分片(Hashed)
// - 对分片键值做哈希,均匀分布
// - 适合高写入、随机查询场景
// - 缺点:不支持范围查询
// 3. 区域分片(Zone)
// - 按地理位置或数据中心分配数据
// - 适合数据本地化合规要求
// 常见分片键选择
db.orders.createIndex({ userId: "hashed" }) // 哈希分片 — 推荐
db.logs.createIndex({ createdAt: 1 }) // 范围分片 — 适合时间序列
db.users.createIndex({ region: 1, userId: 1 }) // 组合分片 — 适合多维度查询
// 查看分片状态
sh.status()
sh.getBalancerState()
db.orders.getShardDistribution()
// 数据块(Chunk)管理
sh.startBalancer() // 启动自动均衡
sh.stopBalancer() // 暂停均衡(维护时使用)
sh.moveChunk("MyApp.orders", { userId: "user123" }, "shard2") // 手动迁移读写分离与高可用
// 副本集(Replica Set)— 高可用基础
// 副本集配置
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "mongo1:27017", priority: 2 }, // 主节点(优先级最高)
{ _id: 1, host: "mongo2:27017", priority: 1 }, // 从节点
{ _id: 2, host: "mongo3:27017", priority: 1, hidden: true } // 隐藏节点(用于备份)
]
})
// 读偏好(Read Preference)
// primary: 只从主节点读(默认,强一致性)
// primaryPreferred: 优先主节点,主节点不可用时读从节点
// secondary: 只从从节点读(最终一致性)
// nearest: 从延迟最低的节点读
// 在连接字符串中设置读偏好
// mongodb://user:pass@mongo1,mongo2,mongo3/mydb?readPreference=secondary
// 在查询级别设置读偏好
db.orders.find({ status: "completed" }).readPref("secondary")
// 写关注(Write Concern)
// w: 1 — 主节点确认写入即返回(默认)
// w: majority — 大多数节点确认写入
// w: "majority" + j: true — 大多数节点确认并写入日志
db.orders.insertOne({ ... }, { writeConcern: { w: "majority", j: true, wtimeout: 5000 } })
// 读关注(Read Concern)
// local: 返回最新数据,不保证持久化
// majority: 返回已确认写入大多数节点的数据
// linearizable: 只读已确认且持久化的数据(最严格)
db.orders.find({ status: "paid" }).readConcern("majority")数据建模模式
// MongoDB 文档建模的常见模式
// 1. 嵌入模式(Embedding)— 适合一对少关系
db.users.insertOne({
name: "张三",
email: "zhangsan@example.com",
address: {
province: "北京",
city: "北京",
detail: "海淀区xxx"
},
preferences: {
language: "zh-CN",
theme: "dark",
notifications: true
}
})
// 优点:一次查询获取所有数据,无需 JOIN
// 缺点:文档可能过大,嵌套更新复杂
// 2. 引用模式(Referencing)— 适合一对多关系
db.orders.insertOne({
userId: ObjectId("..."),
items: [
{ productId: ObjectId("..."), quantity: 2, price: 49.9 },
{ productId: ObjectId("..."), quantity: 1, price: 199.0 }
],
totalAmount: 298.8
})
// 查询时通过 $lookup 关联
db.orders.aggregate([
{ $lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}}
])
// 3. 属性模式(Attribute Pattern)— 适合动态属性
db.products.insertOne({
name: "手机",
attributes: [
{ key: "color", value: "黑色" },
{ key: "storage", value: "256GB" },
{ key: "weight", value: "200g" }
]
})
// 查询特定属性
db.products.find({ "attributes": { $elemMatch: { key: "color", value: "黑色" } } })
// 为属性模式创建索引
db.products.createIndex({ "attributes.key": 1, "attributes.value": 1 })
// 4. 桶模式(Bucket Pattern)— 适合时间序列
db.sensor_data.insertOne({
sensorId: "sensor-001",
date: ISODate("2026-04-14"),
hourlyReadings: {
"00": 23.5, "01": 22.1, "02": 21.8,
// ... 每小时一个字段
}
})
// 优点:大幅减少文档数量,查询效率高
// 缺点:不适合需要更新单个时间点的场景
// 5. 模式验证(Schema Validation)
db.createCollection("users", {
validator: {
$jsonSchema: {
bsonType: "object",
required: ["name", "email"],
properties: {
name: { bsonType: "string", minLength: 1 },
email: { bsonType: "string", pattern: "^[^@]+@[^@]+$" },
age: { bsonType: "int", minimum: 0, maximum: 150 }
}
}
},
validationAction: "error" // 或 "warn"
})MongoDB 性能监控与调优
// 性能监控命令
// 1. 慢查询分析
db.setProfilingLevel(1, { slowms: 100 }) // 记录超过 100ms 的操作
db.system.profile.find().sort({ ts: -1 }).limit(10) // 查看慢查询
// 2. 当前操作
db.currentOp() // 查看正在执行的操作
db.currentOp({ "op" : "query", "secs_running": { $gt: 5 } }) // 超过 5 秒的查询
// 3. 服务器状态
db.serverStatus()
db.serverStatus().connections // 连接数
db.serverStatus().network // 网络流量
// 4. 集合统计
db.orders.stats()
db.orders.totalIndexSize() // 索引总大小
db.orders.totalSize() // 数据总大小
// 5. 副本集状态
rs.status()
rs.printSecondaryReplicationInfo() // 从节点复制延迟
// 性能调优建议:
// - 使用 Explain 分析慢查询
// - 创建合适的索引(ESR 原则)
// - 避免 $where 和 $regex 前缀通配符
// - 使用 projection 只返回需要的字段
// - 批量写入优于逐条写入
// - 使用 WiredTiger 存储引擎的压缩选项// 常用索引维护
// 查看索引使用情况
db.orders.aggregate([
{ $indexStats: {} }
])
// 返回每个索引的使用次数
// 重建索引(碎片整理)
db.orders.reIndex() // 在线重建(会阻塞写入)
// 生产环境建议使用:
db.runCommand({ compact: "orders", force: true }) // 压缩集合
// TTL 索引自动清理
db.logs.createIndex({ createdAt: 1 }, { expireAfterSeconds: 2592000 }) // 30 天后删除
// 注意:TTL 索引后台线程每 60 秒运行一次,不保证精确时间
// 隐藏索引(测试删除索引的影响)
db.orders.hideIndex("idx_unused")
// 如果确认无影响,可以安全删除
db.orders.dropIndex("idx_unused")