MongoDB 聚合管道中的 (Stages 和 Operators)
1. 为何聚合管道效率更高#
1.1. 普通查询#
var posts = await _postModel.Find(_ => true).ToListAsync();
List<CommentModel> allComments = new List<CommentModel>();
foreach (var post in posts)
{
if (post.CommentList != null && post.CommentList.Any())
{
allComments.AddRange(post.CommentList);
}
}
return allComments;
❗1. 传输的数据体积大
ToListAsync()
会把整条文档加载进内存,包括帖子内容、标题、标签、图片等一堆无关字段- 比如每个帖子 1MB,1000 个帖子就是 1GB 网络传输量,哪怕你只要其中的评论
❗2. 内存占用高
- 所有数据都拉进内存,哪怕你只关心里面一小块数据(比如
CommentList
) - 对于大集合、高并发服务,这会造成严重的资源消耗
❗3. 处理逻辑写在代码层,效率低
- 你要在 C# 层手动拆解、遍历、组合这些数据,相当于自己在模拟数据库的工作,性能远不如数据库原生操作
1.2. 聚合查询#
var pipeline = new BsonDocument[]
{
// 第一步:展开 CommentList 数组
// 对每个 Post 文档,将 CommentList 中的每条评论都“拆开”,每条评论单独形成一条文档
new BsonDocument("$unwind",
new BsonDocument("path", "$CommentList")
.Add("preserveNullAndEmptyArrays", false)), // 如果 CommentList 是 null 或空数组,则跳过该文档
// 第二步:投影出我们感兴趣的字段
// 我们只要评论内容,把它包装成 "comment" 字段,同时去掉 _id
new BsonDocument("$project",
new BsonDocument("_id", 0) // 不要 MongoDB 的 ObjectId 字段
.Add("comment", "$CommentList")), // 把 CommentList 的当前项命名为 comment
// 第三步:把 comment 作为根节点
// 把整条文档替换为 comment 字段的内容,相当于去掉了外层包裹
new BsonDocument("$replaceRoot",
new BsonDocument("newRoot", "$comment"))
};
// 执行聚合管道,最终返回的是一组 CommentModel 类型的列表
var comments = await _postModel.Aggregate<CommentModel>(pipeline).ToListAsync();
return comments;
✅1. 只取需要的数据
- 聚合管道中用
$project
和$replaceRoot
精确过滤出「你想要的字段」,减少网络负担
✅2. 由数据库高效处理数据结构
- MongoDB 内部做展开、映射、过滤,使用原生的 C++ 引擎,速度远远快于 C# 遍历
✅3. 节省内存与计算资源
- 不需要把整条大文档拉进来,只需要从 MongoDB 拿你想要的部分,C# 层代码也变得非常轻量
✅4. 可以在聚合中做更复杂操作
- 想做排序、过滤、分页等操作?聚合支持
$match
、$sort
、$limit
,让数据库帮你完成这些逻辑
2. unwind
, project
, replaceRoot
#
在上面的聚合管道中, 我们用了:
$unwind: "$CommentList"
$project: { comment: "$CommentList" }
$replaceRoot: { newRoot: "$comment" }
MongoDB 会把每个评论都单独展开成一条结果文档,像这样:
[
{ "Author": "小明", "Content": "写得不错", "CreatedAt": "..." },
{ "Author": "小红", "Content": "顶一个", "CreatedAt": "..." },
...
]
每条结果就是一个纯粹的 CommentModel
,所以:聚合后返回的每一条记录是一个 CommentModel
实例,组成的整体就是一个 List<CommentModel>
假设有一个帖子集合 posts
,每条文档结构如下:
{
"_id": "post1",
"title": "MongoDB 聚合示例",
"CommentList": [
{ "Author": "Alice", "Content": "Nice post!" },
{ "Author": "Bob", "Content": "I agree!" }
]
}
我们目标是:从整个 posts 集合中,提取出每一条独立的评论(CommentModel),不带其他字段
2.1. 第一步 $unwind
—— 展开数组字段#
✅ 语法:
{ "$unwind": { "path": "$CommentList", "preserveNullAndEmptyArrays": false } }
✅ 参数解释:
path
: 要展开的数组字段,必须是数组(这里是CommentList
)preserveNullAndEmptyArrays
: 是否保留空数组或不存在该字段的文档false
表示不保留(只处理有评论的文档)
✅ 效果:
这一步的作用是:
如果一个文档中的
CommentList
是一个数组, 它会把数组里的每个元素拆分为一条新的文档
🔁 原始数据:
{
"_id": "post1",
"CommentList": [
{ "Author": "Alice", "Content": "Nice post!" },
{ "Author": "Bob", "Content": "I agree!" }
]
}
📤 经过 $unwind
后变成两条记录:
{
"_id": "post1",
"CommentList": { "Author": "Alice", "Content": "Nice post!" }
},
{
"_id": "post1",
"CommentList": { "Author": "Bob", "Content": "I agree!" }
}
每条结果都只包含一个评论对象在 CommentList
字段中(已经不是数组了)
2.2. 第二步 $project
—— 保留我们关心的字段,并重新命名#
✅ 语法:
{ "$project": { "_id": 0, "comment": "$CommentList" } }
✅ 参数解释:
-
_id: 0
:不显示 MongoDB 默认的_id
字段 -
"comment": "$CommentList"
:把CommentList
字段的内容赋值给一个新字段comment
🔁 上一步输出:
{
"_id": "post1",
"CommentList": { "Author": "Alice", "Content": "Nice post!" }
}
📤 变成:
{
"comment": { "Author": "Alice", "Content": "Nice post!" }
}
💡 我们只保留了评论这部分数据,字段名变成了 comment
,更好处理下一步的结构变换
2.3. 第三步 $replaceRoot
—— 让 comment 成为新文档的根部#
✅ 语法:
{ "$replaceRoot": { "newRoot": "$comment" } }
✅ 参数解释
newRoot
: 用哪个字段的值替换掉当前文档的根
🔁 上一步输出:
{
"comment": { "Author": "Alice", "Content": "Nice post!" }
}
📤 变成:
{
"Author": "Alice",
"Content": "Nice post!"
}
💡 comment
字段里的内容被提取成了顶层字段,正是你最终想要的结构:一个干净的 CommentModel
3. $group
, $sum
#
假设我们有一个集合 orders
,它存储了客户订单,结构大概如下:
[
{ "_id": 1, "orderStatus": "pending", "amount": 100, "customer": "Alice" },
{ "_id": 2, "orderStatus": "processing", "amount": 200, "customer": "Bob" },
{ "_id": 3, "orderStatus": "completed", "amount": 150, "customer": "Charlie" },
{ "_id": 4, "orderStatus": "pending", "amount": 300, "customer": "Dave" },
{ "_id": 5, "orderStatus": "completed", "amount": 250, "customer": "Eve" },
{ "_id": 6, "orderStatus": "processing", "amount": 400, "customer": "Frank" }
]
目标是通过聚合管道统计每个订单状态(orderStatus
)的订单数量,例如有多少订单是 pending
、processing
和 completed
, 使用一个 $group
聚合阶段配合一个聚合运算符 $sum
就可以实现:
{
"$group": {
"_id": "$orderStatus",
"count": { "$sum": 1 }
}
}
"_id": "$orderStatus"
告诉 MongoDB 根据orderStatus
字段的值将文档分组- MongoDB 会扫描集合中的每个文档,读取
orderStatus
的值,并将具有相同orderStatus
值的文档归为一组 $sum: 1
的计数过程- 对于每个分组,MongoDB 创建一个新的文档,包含
_id
(分组键的值,例如 “pending”)和count
字段(由 $sum: 1 定义)count
初始值为 0 - 也就是说, MongoDB 会创建三个文档(因为一共有三个状态), 每个文档的分组键
_id
的值都是对应的状态值 - MongoDB 按顺序(或优化后的顺序)遍历集合中的每个文档,检查其
orderStatus
,并将其分配到对应的分组 - 对于每个文档,
$sum: 1
表示将该文档的count
字段的值增加 1, 换句话说,每个文档为它所在分组的count
贡献 1
- 对于每个分组,MongoDB 创建一个新的文档,包含
经过 $group
阶段,数据被转换为以下形式:
[
{ "_id": "pending", "count": 2 },
{ "_id": "processing", "count": 2 },
{ "_id": "completed", "count": 2 }
]
4. Operators vs Stages#
4.1. 聚合阶段(Aggregation Stages)#
构成聚合管道的“每一步”
聚合阶段 | 功能 |
---|---|
$match |
过滤文档(类似 SQL 的 WHERE ) |
$group |
分组并进行聚合计算(类似 SQL 的 GROUP BY ) |
$project |
投影字段(类似 SQL 的 SELECT column AS ... ) |
$sort |
排序 |
$limit / $skip |
分页 |
$lookup |
类似 SQL 的 JOIN |
4.2. 聚合运算符(Aggregation Operators)#
在某些阶段内部使用的“函数”,比如 $group
阶段里面常用的:
聚合运算符 | 用法 | 类似 SQL 函数 |
---|---|---|
$sum |
求和或计数 | SUM() / COUNT() |
$avg |
求平均 | AVG() |
$min / $max |
最小/最大值 | MIN() / MAX() |
5. $sum
vs $group
#
$sum
是聚合运算符, 与 $group
聚合阶段不同, 一个查询聚合管道由多个聚合阶段组成, 每个聚合阶段都定义了一个新的文档, 可以理解为每个聚合阶段 stage 都会对输入的文档流进行处理,并生成一个新的文档流(可以是转换后的文档、过滤后的文档、分组后的文档等)
5.1. 业务逻辑#
[
{ "_id": 1, "product": "Laptop", "category": "Electronics", "price": 1000, "quantity": 2, "orderDate": "2023-01-10", "region": "North" },
{ "_id": 2, "product": "Phone", "category": "Electronics", "price": 500, "quantity": 5, "orderDate": "2023-01-15", "region": "South" },
{ "_id": 3, "product": "Desk", "category": "Furniture", "price": 200, "quantity": 1, "orderDate": "2023-02-01", "region": "North" },
{ "_id": 4, "product": "Chair", "category": "Furniture", "price": 100, "quantity": 4, "orderDate": "2023-02-10", "region": "South" },
{ "_id": 5, "product": "Tablet", "category": "Electronics", "price": 300, "quantity": 3, "orderDate": "2023-03-01", "region": "North" }
]
我们想分析 2023 年 1 月和 2 月的销售数据,筛选出 Electronics
类别,按 region
分组,计算每个地区的总销售额(price * quantity
),并按总销售额降序排序,输出特定字段, 以下是对应的聚合管道(用 MongoDB 的 BSON
格式表示)
[
{
"$match": {
"category": "Electronics",
"orderDate": { "$gte": "2023-01-01", "$lte": "2023-02-28" }
}
},
{
"$project": {
"region": 1,
"totalSale": { "$multiply": ["$price", "$quantity"] }
}
},
{
"$group": {
"_id": "$region",
"totalRegionSale": { "$sum": "$totalSale" }
}
},
{
"$sort": {
"totalRegionSale": -1
}
}
]
5.2. 初始数据 输入文档结构#
{
"_id": ObjectId,
"product": String,
"category": String,
"price": Number,
"quantity": Number,
"orderDate": String,
"region": String
}
5.3. 阶段 1: $match
#
过滤文档,只保留 category
为 "Electronics"
且 orderDate
在 2023 年 1 月至 2 月之间的文档
{
"$match": {
"category": "Electronics",
"orderDate": { "$gte": "2023-01-01", "$lte": "2023-02-28" }
}
}
输出文档结构:与输入相同,因为 $match 只过滤文档,不改变结构
5.4. 阶段 2: $project
#
操作:选择并转换字段,只保留 region
字段,并计算每个订单的总销售额(totalSale = price * quantity
)
{
"$project": {
"region": 1,
"totalSale": { "$multiply": ["$price", "$quantity"] }
}
}
输入文档结构:阶段 1 的输出(包含 _id
、 product
、 category
等)
输出文档结构:
region
: 保留原始 region 字段totalSale
: 新字段,计算price * quantity
- 注意:默认情况下,
_id
字段会保留,除非明确设置为"_id": 0
{
"_id": ObjectId,
"region": String,
"totalSale": Number
}
输出数据:
[
{ "_id": 1, "region": "North", "totalSale": 2000 },
{ "_id": 2, "region": "South", "totalSale": 2500 }
]
5.5. 阶段 3: $group
#
- 操作:按
region
分组,计算每个地区的totalSale
总和(totalRegionSale
)
{
"$group": {
"_id": "$region",
"totalRegionSale": { "$sum": "$totalSale" }
}
}
输入文档结构:阶段 2 的输出(包含 _id
、 region
、 totalSale
)
输出文档结构:
-
_id
: 分组键,即region
的值 -
totalRegionSale
: 每个组的totalSale
字段总和(由$sum
计算)
{
"_id": String,
"totalRegionSale": Number
}
输出数据:
[
{ "_id": "North", "totalRegionSale": 2000 },
{ "_id": "South", "totalRegionSale": 2500 }
]
…