MongoDB 聚合管道
大约 16 分钟约 4895 字
MongoDB 聚合管道
简介
MongoDB 聚合管道(Aggregation Pipeline)是 MongoDB 最强大的数据分析工具,通过多个阶段(Stage)对文档进行过滤、转换、分组和排序。它类似于 SQL 的 GROUP BY + JOIN + 子查询的组合,但以管道方式链式处理,每阶段的输出是下一阶段的输入。
聚合管道的设计理念源自 Unix 管道 —— 将复杂的数据处理任务拆分为多个简单步骤,每步只做一件事,通过串联组合实现强大的数据处理能力。MongoDB 4.2+ 进一步增强了聚合管道的能力,支持 $merge 增量更新和 $setWindowFields 窗口函数,使其在数据分析和 ETL 场景中更加得心应手。
特点
管道阶段完整参考
文档处理阶段:
- $project — 选择/排除/重命名字段,支持表达式计算
- $addFields — 添加新字段或覆盖已有字段
- $set — $addFields 的别名(MongoDB 3.4+)
- $unset — 移除指定字段(MongoDB 3.4+)
- $replaceRoot — 用嵌入文档替换根文档
过滤与排序:
- $match — 过滤文档,支持查询操作符
- $sort — 排序
- $skip — 跳过指定数量的文档
- $limit — 限制返回文档数
分组与聚合:
- $group — 分组聚合
- $bucket — 按范围分组
- $bucketAuto — 自动计算分桶边界
- $accumulator — 自定义累加器(MongoDB 4.4+)
- $function — 自定义 JavaScript 函数(MongoDB 4.4+)
数组操作:
- $unwind — 展开数组
- $isArray — 判断是否为数组
- $arrayElemAt — 获取数组指定位置的元素
- $filter — 过滤数组元素
- $map — 映射数组元素
- $reduce — 归约数组
- $in — 判断元素是否在数组中
- $size — 获取数组长度
- $slice — 截取数组子集
关联查询:
- $lookup — 左外连接
- $graphLookup — 递归图查询
集合操作:
- $out — 将结果写入集合(替换模式)
- $merge — 将结果合并到集合(增量模式)
- $unionWith — 合并多个集合
窗口函数:
- $setWindowFields — MongoDB 5.0+ 窗口函数实现
基础聚合:按月统计销售额
// 原始订单文档示例
// {
// _id: ObjectId("..."),
// user_id: ObjectId("..."),
// status: "completed",
// total_amount: 299.99,
// items: [{ product_id: "p1", qty: 2, price: 99.99 }],
// created_at: ISODate("2024-03-15T10:30:00Z")
// }
// 按月统计销售额和订单数
db.orders.aggregate([
// 阶段1: 过滤已完成订单 — 尽早放在管道前面
{ $match: {
status: "completed",
created_at: { $gte: ISODate("2024-01-01") }
}},
// 阶段2: 按月分组统计
{ $group: {
_id: {
year: { $year: "$created_at" },
month: { $month: "$created_at" }
},
totalRevenue: { $sum: "$total_amount" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$total_amount" },
maxOrderValue: { $max: "$total_amount" },
minOrderValue: { $min: "$total_amount" }
}},
// 阶段3: 按月份排序
{ $sort: { "_id.year": 1, "_id.month": 1 } },
// 阶段4: 格式化输出
{ $project: {
month: { $dateToString: {
format: "%Y-%m",
date: {
$dateFromParts: {
year: "$_id.year",
month: "$_id.month"
}
}
}},
totalRevenue: { $round: ["$totalRevenue", 2] },
orderCount: 1,
avgOrderValue: { $round: ["$avgOrderValue", 2] },
maxOrderValue: 1,
minOrderValue: 1
}}
]);条件聚合与多级分组
// 按年、月、日三级分组 + 条件统计
db.orders.aggregate([
{ $match: { created_at: { $gte: ISODate("2024-01-01") } } },
{ $group: {
_id: {
year: { $year: "$created_at" },
month: { $month: "$created_at" },
day: { $dayOfMonth: "$created_at" }
},
totalOrders: { $sum: 1 },
completedOrders:{ $sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] } },
cancelledOrders:{ $sum: { $cond: [{ $eq: ["status", "cancelled"] }, 1, 0] } },
totalRevenue: { $sum: { $cond: [{ $eq: ["$status", "completed"] }, "$total_amount", 0] } },
totalRefund: { $sum: { $cond: [{ $eq: ["$status", "refunded"] }, "$total_amount", 0] } }
}},
{ $sort: { "_id.year": 1, "_id.month": 1, "_id.day": 1 } },
// 格式化日期
{ $project: {
date: { $dateToString: {
format: "%Y-%m-%d",
date: {
$dateFromParts: {
year: "$_id.year",
month: "$_id.month",
day: "$_id.day"
}
}
}},
totalOrders: 1,
completedOrders: 1,
cancelledOrders: 1,
completionRate: {
$round: [
{ $multiply: [
{ $divide: ["$completedOrders", "$totalOrders"] },
100
]},
1
]
},
totalRevenue: { $round: ["$totalRevenue", 2] },
totalRefund: { $round: ["$totalRefund", 2] },
netRevenue: { $round: [{ $subtract: ["$totalRevenue", "$totalRefund"] }, 2] }
}}
]);$lookup 关联查询
基础关联
// 订单关联用户信息(等同于 SQL 的 LEFT JOIN)
db.orders.aggregate([
{ $match: { status: "completed" } },
// 基础 $lookup:关联用户集合
{ $lookup: {
from: "users", // 关联的目标集合
localField: "user_id", // 当前集合的关联字段
foreignField: "_id", // 目标集合的关联字段
as: "user_info" // 输出字段名(数组)
}},
// 展开数组为单个对象(一对一关系)
{ $unwind: {
path: "$user_info",
preserveNullAndEmptyArrays: true // 保留无匹配的文档(LEFT JOIN)
}},
// 添加计算字段
{ $addFields: {
user_name: "$user_info.name",
user_email: "$user_info.email"
}},
// 投影输出
{ $project: {
user_info: 0 // 排除完整的用户信息对象
}}
]);Pipeline 形式的 $lookup(更强大)
// MongoDB 3.6+ 支持在 $lookup 内使用管道
// 可以在关联时添加过滤、投影等条件
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $lookup: {
from: "users",
let: { userId: "$user_id" }, // 将当前文档字段传入管道
pipeline: [
// 在关联集合中进行过滤
{ $match: {
$expr: { $eq: ["$_id", "$$userId"] }
}},
// 只选择需要的字段
{ $project: {
name: 1,
email: 1,
level: 1,
// 计算用户是否为 VIP
isVip: { $gte: ["$level", 3] }
}}
],
as: "user_info"
}},
{ $unwind: { path: "$user_info", preserveNullAndEmptyArrays: true } }
]);多集合关联
// 订单 -> 用户 -> 收货地址 -> 商品详情
db.orders.aggregate([
// 关联用户
{ $lookup: {
from: "users",
localField: "user_id",
foreignField: "_id",
as: "user"
}},
{ $unwind: { path: "$user", preserveNullAndEmptyArrays: true } },
// 关联收货地址
{ $lookup: {
from: "addresses",
let: { addressId: "$shipping_address_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$_id", "$$addressId"] } } },
{ $project: { city: 1, province: 1, district: 1, detail: 1 } }
],
as: "shipping_address"
}},
{ $unwind: { path: "$shipping_address", preserveNullAndEmptyArrays: true } },
// 关联订单项 -> 商品
{ $lookup: {
from: "order_items",
localField: "_id",
foreignField: "order_id",
as: "items"
}},
// 对每个订单项关联商品信息
{ $lookup: {
from: "products",
localField: "items.product_id",
foreignField: "_id",
as: "product_details"
}},
// 最终投影
{ $project: {
order_id: "$_id",
order_date: "$created_at",
total_amount: 1,
user_name: "$user.name",
shipping_city: "$shipping_address.city",
item_count: { $size: "$items" }
}}
]);$unwind 展开数组
基础展开
// 统计文章标签使用频率
// 原始文档: { title: "...", tags: ["mongodb", "database", "nosql"], view_count: 1000 }
db.articles.aggregate([
{ $match: { status: "published" } },
// 展开标签数组 — 每个标签生成一条文档
{ $unwind: "$tags" },
// 按标签分组统计
{ $group: {
_id: "$tags",
articleCount: { $sum: 1 },
totalViews: { $sum: "$view_count" },
// 收集使用该标签的文章标题
articles: { $push: { title: "$title", views: "$view_count" } }
}},
{ $sort: { articleCount: -1 } },
{ $limit: 20 },
// 格式化输出
{ $project: {
tag: "$_id",
articleCount: 1,
totalViews: 1,
avgViews: { $round: [{ $divide: ["$totalViews", "$articleCount"] }, 0] },
// 只显示前 5 篇文章
topArticles: { $slice: ["$articles", 5] }
}}
]);展开嵌套对象数组
// 订单包含多个商品项,展开后按商品统计销量
// 原始文档: {
// _id: 1, status: "completed",
// items: [
// { product_id: "p1", name: "商品A", qty: 2, price: 99 },
// { product_id: "p2", name: "商品B", qty: 1, price: 199 }
// ]
// }
db.orders.aggregate([
{ $match: { status: "completed" } },
// 展开订单项数组
{ $unwind: "$items" },
// 按商品分组统计
{ $group: {
_id: "$items.product_id",
productName: { $first: "$items.name" },
totalQtySold: { $sum: { $multiply: ["$items.qty", 1] } },
totalRevenue: { $sum: { $multiply: ["$items.qty", "$items.price"] } },
orderCount: { $sum: 1 },
avgPrice: { $avg: "$items.price" }
}},
{ $sort: { totalRevenue: -1 } },
{ $limit: 50 }
]);带索引的 $unwind(includeArrayIndex)
// 展开数组时保留数组索引位置
db.events.aggregate([
{ $unwind: {
path: "$participants",
includeArrayIndex: "participantIndex" // 保留元素在数组中的位置
}},
{ $match: { "participants.role": "organizer" } },
{ $project: {
event_name: "$name",
event_date: "$date",
participant_name: "$participants.name",
participant_index: "$participantIndex"
}}
]);$facet 多维度统计
// 一次查询返回商品的多维度统计(适用于仪表板)
db.products.aggregate([
{ $facet: {
// 维度1: 按分类统计数量
byCategory: [
{ $group: {
_id: "$category",
count: { $sum: 1 },
avgPrice: { $avg: "$price" },
totalStock: { $sum: "$stock" }
}},
{ $sort: { count: -1 } },
{ $limit: 10 }
],
// 维度2: 价格区间分布
priceRange: [
{ $bucket: {
groupBy: "$price",
boundaries: [0, 50, 100, 500, 1000, 5000],
default: "5000+",
output: {
count: { $sum: 1 },
avgRating: { $avg: "$rating" }
}
}}
],
// 维度3: 总体统计
overall: [
{ $group: {
_id: null,
totalProducts: { $sum: 1 },
avgPrice: { $avg: "$price" },
maxPrice: { $max: "$price" },
minPrice: { $min: "$price" },
avgRating: { $avg: "$rating" },
totalStock: { $sum: "$stock" }
}}
],
// 维度4: 库存预警
lowStock: [
{ $match: { stock: { $lt: 10 } } },
{ $sort: { stock: 1 } },
{ $limit: 20 },
{ $project: {
name: 1,
price: 1,
stock: 1,
category: 1
}}
],
// 维度5: 最近上架
recent: [
{ $sort: { created_at: -1 } },
{ $limit: 5 },
{ $project: {
name: 1,
price: 1,
category: 1,
created_at: 1
}}
]
}}
]);
// 返回结构:
// {
// "byCategory": [...],
// "priceRange": [...],
// "overall": [...],
// "lowStock": [...],
// "recent": [...]
// }$merge 物化视图
// 将聚合结果合并到统计集合(增量更新)
// 每天定时运行,更新销售统计
db.orders.aggregate([
{ $match: {
status: "completed",
created_at: { $gte: ISODate("2024-01-01") }
}},
{ $group: {
_id: {
year: { $year: "$created_at" },
month: { $month: "$created_at" },
day: { $dayOfMonth: "$created_at" }
},
totalOrders: { $sum: 1 },
totalRevenue: { $sum: "$total_amount" },
avgOrderValue:{ $avg: "$total_amount" }
}},
// 将结果写入统计集合
{ $merge: {
into: "daily_sales_stats",
on: "_id", // 匹配字段(用于判断更新还是插入)
whenMatched: "replace", // 匹配时替换
whenNotMatched: "insert" // 不匹配时插入
}}
]);
// 查询时直接读预计算结果,速度极快
db.daily_sales_stats.find({
"_id.year": 2024,
"_id.month": 3
}).sort({ "_id.day": 1 });窗口函数 $setWindowFields
// MongoDB 5.0+ 窗口函数
// 计算每个客户的累计消费和消费排名
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $setWindowFields: {
partitionBy: "$user_id", // 按用户分区
sortBy: { created_at: 1 }, // 按时间排序
output: {
// 累计消费(从第一行到当前行)
cumulativeSpent: {
$sum: "$total_amount",
window: {
documents: ["unbounded", "current"]
}
},
// 订单序号
orderNumber: {
$documentNumber: {}
},
// 消费金额排名(降序)
amountRank: {
$rank: {
sortBy: { total_amount: -1 }
}
},
// 与上单的时间间隔(天数)
daysSinceLastOrder: {
$shift: {
output: {
$dateDiff: {
startDate: "$$ROOT.created_at",
endDate: { $dateTrunc: { date: "$created_at", unit: "day" } },
unit: "day"
}
},
by: -1
}
}
}
}},
// 只保留每个用户的最后一单(最新统计)
{ $group: {
_id: "$user_id",
latestOrderDate: { $last: "$created_at" },
cumulativeSpent: { $last: "$cumulativeSpent" },
totalOrders: { $last: "$orderNumber" },
maxSingleOrder: { $max: "$total_amount" }
}},
{ $sort: { cumulativeSpent: -1 } }
]);$graphLookup 递归图查询
// 查询组织架构:从某个员工出发,递归查找所有下属
db.employees.aggregate([
{ $match: { name: "张三" } },
{ $graphLookup: {
from: "employees",
startWith: "$_id", // 从当前员工 ID 开始
connectFromField: "_id", // 当前文档的关联字段
connectToField: "manager_id",// 关联到目标集合的字段
as: "subordinates", // 输出字段
depthField: "level", // 层级深度
maxDepth: 5 // 最大递归深度(防止无限递归)
}},
{ $project: {
name: 1,
department: 1,
subordinate_count: { $size: "$subordinates" },
max_depth: { $max: "$subordinates.level" }
}}
]);$replaceRoot 替换根文档
// 将嵌入文档提升为根文档
// 适用于嵌入文档与顶层文档结构相同的情况
// 场景:活动表中的最新获胜者信息
db.activities.aggregate([
{ $match: { status: "completed" } },
{ $project: {
// 从获胜者数组中取第一个
winner: { $arrayElemAt: ["$winners", 0] },
activity_name: "$name",
prize: "$prize"
}},
// 将 winner 对象提升为根文档
{ $replaceRoot: {
newRoot: {
$mergeObjects: [
"$winner",
{
activity_name: "$activity_name",
prize: "$prize"
}
]
}
}}
]);C# Driver 使用聚合管道
基础用法
using MongoDB.Bson;
using MongoDB.Driver;
public class OrderService
{
private readonly IMongoCollection<Order> _orders;
public OrderService(IMongoDatabase database)
{
_orders = database.GetCollection<Order>("orders");
}
// 按月统计销售额
public async Task<List<MonthlySales>> GetMonthlySalesAsync(int year)
{
var pipeline = new BsonDocument[]
{
new BsonDocument("$match", new BsonDocument
{
{ "status", "completed" },
{ "created_at", new BsonDocument("$gte", new DateTime(year, 1, 1, 0, 0, 0, DateTimeKind.Utc)) }
}),
new BsonDocument("$group", new BsonDocument
{
{ "_id", new BsonDocument
{
{ "year", new BsonDocument("$year", "$created_at") },
{ "month", new BsonDocument("$month", "$created_at") }
}
},
{ "totalRevenue", new BsonDocument("$sum", "$total_amount") },
{ "orderCount", new BsonDocument("$sum", 1) },
{ "avgOrderValue", new BsonDocument("$avg", "$total_amount") }
}),
new BsonDocument("$sort", new BsonDocument
{
{ "_id.year", 1 },
{ "_id.month", 1 }
})
};
var result = await _orders.Aggregate<MonthlySales>(pipeline)
.ToListAsync();
return result;
}
}
public class MonthlySales
{
public MonthKey Id { get; set; } = new();
public decimal TotalRevenue { get; set; }
public int OrderCount { get; set; }
public decimal AvgOrderValue { get; set; }
}
public class MonthKey
{
public int Year { get; set; }
public int Month { get; set; }
}使用 LINQ 风格的聚合定义
// 使用 Aggregate Fluent API(更接近 LINQ 风格)
public async Task<List<CustomerRanking>> GetCustomerRankingAsync(int topN)
{
var result = await _orders.Aggregate()
.Match(x => x.Status == "completed")
.Group(
x => x.UserId,
g => new CustomerRanking
{
UserId = g.Key,
TotalSpent = g.Sum(x => x.TotalAmount),
OrderCount = g.Count(),
AvgOrderValue = g.Average(x => x.TotalAmount)
})
.SortByDescending(x => x.TotalSpent)
.Limit(topN)
.ToListAsync();
return result;
}带 allowDiskUse 的大数据量聚合
public async Task<List<BsonDocument>> GetLargeAggregationAsync()
{
var options = new AggregateOptions
{
AllowDiskUse = true, // 允许使用磁盘临时文件
MaxTime = TimeSpan.FromMinutes(5), // 超时时间
BatchSize = 1000 // 批次大小
};
var pipeline = new BsonDocument[]
{
// ... 复杂的聚合管道
};
return await _orders.Aggregate<BsonDocument>(pipeline, options)
.ToListAsync();
}性能优化
$match 前置原则
// 错误:$match 在管道后面
db.orders.aggregate([
{ $group: { _id: "$user_id", total: { $sum: "$total_amount" } } },
{ $match: { total: { $gt: 1000 } } } // 先分组再过滤,处理了大量无用数据
]);
// 正确:$match 在管道前面
db.orders.aggregate([
{ $match: { status: "completed", total_amount: { $gt: 1000 } } }, // 先过滤
{ $group: { _id: "$user_id", total: { $sum: "$total_amount" } } } // 再分组
]);索引优化
// 为 $match 阶段创建索引
db.orders.createIndex({ status: 1, created_at: -1 });
db.orders.createIndex({ user_id: 1, status: 1 });
db.orders.createIndex({ "items.product_id": 1 });
// 查看聚合执行计划
db.orders.explain("executionStats").aggregate([
{ $match: { status: "completed", user_id: ObjectId("...") } },
{ $group: { _id: "$user_id", total: { $sum: "$total_amount" } } }
]);
// 关注输出:
// - IXSCAN: 命中索引(好)
// - COLLSCAN: 全表扫描(需要优化)
// - DocsExamined: 扫描文档数(越少越好)
// - ExecutionTimeMillis: 执行时间内存管理
// 设置 allowDiskUse 防止内存溢出
db.orders.aggregate([...], { allowDiskUse: true });
// MongoDB 4.2+ 可以设置更大的内存限制
db.orders.aggregate([
// ... 管道阶段
], {
allowDiskUse: true,
maxTimeMS: 300000, // 5 分钟超时
cursor: { batchSize: 500 } // 控制批次大小
});
// Python 示例
// client.db.orders.aggregate(pipeline, allowDiskUse=True)$lookup 性能优化
// 普通的 $lookup(全集合扫描目标集合)
{ $lookup: {
from: "users",
localField: "user_id",
foreignField: "_id",
as: "user"
}}
// 优化的 pipeline 形式 $lookup(可以在关联集合中使用索引过滤)
{ $lookup: {
from: "users",
let: { userId: "$user_id" },
pipeline: [
{ $match: {
$expr: { $eq: ["$_id", "$$userId"] },
status: "active" // 额外过滤条件
}},
{ $project: { name: 1, email: 1 } } // 减少返回字段
],
as: "user"
}}
// 对于 $lookup 的 pipeline 形式,确保目标集合的关联字段有索引
db.users.createIndex({ _id: 1, status: 1 });实战模式
模式一:电商销售仪表板
// 一次聚合查询返回完整的销售仪表板数据
db.orders.aggregate([
{ $match: {
created_at: {
$gte: ISODate("2024-01-01"),
$lt: ISODate("2024-04-01")
}
}},
{ $facet: {
// 总体概览
overview: [
{ $group: {
_id: null,
totalRevenue: { $sum: "$total_amount" },
totalOrders: { $sum: 1 },
avgOrderValue: { $avg: "$total_amount" },
completedCount:{ $sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] } },
refundCount: { $sum: { $cond: [{ $eq: ["$status", "refunded"] }, 1, 0] } }
}},
{ $project: {
_id: 0,
totalRevenue: { $round: ["$totalRevenue", 2] },
totalOrders: 1,
avgOrderValue: { $round: ["$avgOrderValue", 2] },
completionRate: {
$round: [
{ $multiply: [
{ $divide: ["$completedCount", "$totalOrders"] },
100
]},
1
]
},
refundRate: {
$round: [
{ $multiply: [
{ $divide: ["$refundCount", "$totalOrders"] },
100
]},
1
]
}
}}
],
// 每日趋势
dailyTrend: [
{ $group: {
_id: {
$dateToString: { format: "%Y-%m-%d", date: "$created_at" }
},
revenue: { $sum: "$total_amount" },
orders: { $sum: 1 }
}},
{ $sort: { _id: 1 } },
{ $project: {
date: "$_id",
revenue: { $round: ["$revenue", 2] },
orders: 1,
_id: 0
}}
],
// 商品排行
topProducts: [
{ $unwind: "$items" },
{ $group: {
_id: "$items.product_id",
name: { $first: "$items.name" },
qty: { $sum: "$items.qty" },
revenue:{ $sum: { $multiply: ["$items.qty", "$items.price"] } }
}},
{ $sort: { revenue: -1 } },
{ $limit: 10 },
{ $project: { product_id: "$_id", name: 1, qty: 1, revenue: 1, _id: 0 } }
]
}}
], { allowDiskUse: true });模式二:漏斗分析
// 用户行为漏斗:浏览 -> 加购 -> 下单 -> 支付
db.events.aggregate([
// 所有浏览事件
{ $match: { action: "view", created_at: { $gte: ISODate("2024-01-01") } } },
{ $group: { _id: "$user_id", viewAt: { $min: "$created_at" } } },
// 关联加购事件
{ $lookup: {
from: "events",
let: { userId: "$_id", viewAt: "$viewAt" },
pipeline: [
{ $match: {
$expr: {
$and: [
{ $eq: ["$user_id", "$$userId"] },
{ $eq: ["$action", "cart_add"] },
{ $gte: ["$created_at", "$$viewAt"] }
]
}
}},
{ $limit: 1 }
],
as: "cartEvent"
}},
// 关联下单事件
{ $lookup: {
from: "events",
let: { userId: "$_id" },
pipeline: [
{ $match: {
$expr: {
$and: [
{ $eq: ["$user_id", "$$userId"] },
{ $eq: ["$action", "order_create"] }
]
}
}},
{ $limit: 1 }
],
as: "orderEvent"
}},
// 关联支付事件
{ $lookup: {
from: "events",
let: { userId: "$_id" },
pipeline: [
{ $match: {
$expr: {
$and: [
{ $eq: ["$user_id", "$$userId"] },
{ $eq: ["$action", "payment_success"] }
]
}
}},
{ $limit: 1 }
],
as: "payEvent"
}},
// 计算漏斗指标
{ $project: {
viewed: { $literal: true },
carted: { $gt: [{ $size: "$cartEvent" }, 0] },
ordered: { $gt: [{ $size: "$orderEvent" }, 0] },
paid: { $gt: [{ $size: "$payEvent" }, 0] }
}},
// 汇总漏斗数据
{ $group: {
_id: null,
viewCount: { $sum: { $cond: ["$viewed", 1, 0] } },
cartCount: { $sum: { $cond: ["$carted", 1, 0] } },
orderCount: { $sum: { $cond: ["$ordered", 1, 0] } },
payCount: { $sum: { $cond: ["$paid", 1, 0] } }
}},
// 计算转化率
{ $project: {
_id: 0,
viewCount: 1,
cartCount: 1,
orderCount: 1,
payCount: 1,
viewToCartRate: {
$round: [{ $multiply: [{ $divide: ["$cartCount", "$viewCount"] }, 100] }, 1]
},
cartToOrderRate: {
$round: [{ $multiply: [{ $divide: ["$orderCount", "$cartCount"] }, 100] }, 1]
},
orderToPayRate: {
$round: [{ $multiply: [{ $divide: ["$payCount", "$orderCount"] }, 100] }, 1]
},
overallConversion: {
$round: [{ $multiply: [{ $divide: ["$payCount", "$viewCount"] }, 100] }, 2]
}
}}
]);优点
缺点
总结
MongoDB 聚合管道是数据分析场景的核心工具,$match 前置 + 合理分组 + 索引配合是性能优化的关键。复杂统计优先使用 $facet 一次性完成,频繁执行的聚合可以配合 $merge 写入物化视图。在生产环境中,务必设置 allowDiskUse: true 和超时时间,避免聚合管道因内存不足或超时而失败。
关键知识点
$match尽量放在管道最前面,可以利用索引减少扫描量$group的_id为null时表示对整个集合聚合allowDiskUse: true允许管道使用临时文件,但会降低性能$lookup的 pipeline 形式(MongoDB 3.6+)支持在关联时使用$match等阶段$facet的各子管道不能互相引用,各自独立处理$merge支持增量更新,适合定时物化视图$setWindowFields支持滑动窗口、排名、累计值等窗口函数计算- 使用
explain("executionStats")分析聚合管道的执行计划
项目落地视角
- 所有聚合查询添加
allowDiskUse: true防止内存溢出 - 为
$match阶段涉及的字段创建索引 - 复杂聚合管道使用 Compass 的 Aggregation Pipeline Builder 可视化调试
- 定时聚合结果通过
$merge写入统计集合,查询走预计算结果 - 聚合管道脚本纳入版本控制,与业务代码同步管理
- 大数据量聚合设置超时时间
maxTimeMS和批次大小
常见误区
- 不在管道开头放
$match:导致后续阶段处理大量无用数据 $lookup不加限制条件:关联查询返回全部文档再过滤,性能极差- 忽略 100MB 内存限制:大数据量聚合必须设置
allowDiskUse - 过度使用
$unwind:展开大数组会产生大量中间文档 $facet中每个子管道都处理全量数据:应该在各子管道中都添加$match- 不查看执行计划就上线聚合管道:可能存在全表扫描或内存溢出
$merge不设置on字段:可能导致重复数据
进阶路线
- 学习 MongoDB Atlas 的聚合管道优化器原理
- 研究
$merge实现增量物化视图 - 了解
$function和$accumulator自定义聚合操作 - 学习 MongoDB 5.0+ 的窗口函数(
$setWindowFields) - 掌握
$changeStream实现实时数据聚合 - 研究分片集群上的聚合优化策略
适用场景
- 报表统计:按时间/分类/区域等维度聚合数据
- 数据 ETL:清洗、转换、关联多集合数据
- 实时大屏:
$facet一次查询返回多种统计维度 - 漏斗分析:用户行为路径转化率统计
- 组织架构:
$graphLookup递归查询上下级关系 - 物化视图:
$merge定时预计算高频统计
落地建议
- 使用 Compass 的 Aggregation Pipeline Builder 调试管道
- 聚合管道脚本纳入版本控制,与业务代码同步管理
- 大数据量聚合设置超时时间
maxTimeMS - 为高频聚合创建物化视图,减少实时计算压力
- 使用
explain验证聚合管道是否命中索引 - 在测试环境用生产级数据量验证聚合管道性能
- 将聚合管道封装为数据库视图或存储函数,统一管理
排错清单
- 在每个阶段后添加
$limit: 1逐阶段检查输出 - 使用
db.collection.explain("executionStats")查看聚合执行计划 - 检查
$match阶段是否命中索引(IXSCAN vs COLLSCAN) - 检查是否有阶段产生了大量中间文档(
$unwind、$lookup) - 检查
$facet子管道是否都添加了$match过滤 - 检查聚合是否因内存超限失败(需设置
allowDiskUse) - 检查
$lookup是否使用了 pipeline 形式以利用索引
复盘问题
- 最耗时的聚合管道是什么?哪个阶段是瓶颈?
- 是否有聚合查询超出 100MB 内存限制?是否需要优化?
- 哪些频繁执行的聚合可以预计算为物化视图?
$lookup是否使用了 pipeline 形式?关联字段是否有索引?- 团队是否熟悉聚合管道的调试工具和方法?
- 聚合管道是否有版本兼容性问题?
