An aggregation pipeline provides better performance and usability than a map-reduce operation.聚合管道提供了比map-reduce操作更好的性能和可用性。
Map-reduce operations can be rewritten using aggregation pipeline operators, such as 可以使用聚合管道运算符重写Map-Reduce操作,例如$group, $merge, and others.$group、$merge等。
For map-reduce operations that require custom functionality, MongoDB provides the 对于需要自定义功能的map-reduce操作,MongoDB从4.4版开始提供$accumulator and $function aggregation operators starting in version 4.4. $accumulator和$function聚合运算符。Use these operators to define custom aggregation expressions in JavaScript.使用这些运算符在JavaScript中定义自定义聚合表达式。
Map-reduce expressions can be re-written as shown in the following sections.Map reduce表达式可以重新编写,如下部分所示。
The table is only an approximate translation. For instance, the table shows an approximate translation of 这张表只是粗略的翻译。例如,该表显示了使用mapFunction using the $project.$project对mapFunction的近似转换。
However, the 然而,mapFunction logic may require additional stages, such as if the logic includes iteration over an array:mapFunction逻辑可能需要额外的阶段,例如,如果逻辑包括对数组的迭代:
function() { this.items.forEach(function(item){ emit(item.sku, 1); }); }
Then, the aggregation pipeline includes an 然后,聚合管道包括一个$unwind and a $project:$unwind和一个$project:
{ $unwind: "$items "},
{ $project: { emits: { key: { "$items.sku" }, value: 1 } } },
emits field in $project may be named something else. $project中的emits字段可能会被命名为其他名称。emits was chosen.emits。| Map-Reduce | |
|---|---|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: <collection>
}
)
| db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $out: <collection> }
] )
|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { replace: <collection>, db:<db> }
}
)
| db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $out: { db: <db>, coll: <collection> } }
] )
|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { merge: <collection>, db: <db> }
}
)
| db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $merge: {
into: { db: <db>, coll: <collection>},
on: "_id"
whenMatched: "replace",
whenNotMatched: "insert"
} },
] )
|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { reduce: <collection>, db: <db> }
}
)
| db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $merge: {
into: { db: <db>, coll: <collection> },
on: "_id"
whenMatched: [
{ $project: {
value: { $function: {
body: <reduceFunction>,
args: [
"$_id",
[ "$value", "$$new.value" ]
],
lang: "js"
} }
} }
]
whenNotMatched: "insert"
} },
] )
|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { inline: 1 }
}
)
| db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} }
] )
|
Various map-reduce expressions can be rewritten using aggregation pipeline operators, such as 可以使用聚合管道运算符(例如$group, $merge, and others, without requiring custom functions. $group、$merge等)重写各种map-reduce表达式,而无需自定义函数。However, for illustrative purposes, the following examples provide both alternatives.然而,出于说明目的,以下示例提供了两种选择。
The following map-reduce operation on the 以下map-reduce操作按orders collection groups by the cust_id, and calculates the sum of the price for each cust_id:cust_id对orders集合组执行,并计算每个cust_id的价格总和:
var mapFunction1 = function() { emit(this.cust_id, this.price); }; var reduceFunction1 = function(keyCustId, valuesPrices) { return Array.sum(valuesPrices); }; db.orders.mapReduce( mapFunction1, reduceFunction1, { out: "map_reduce_example" } )
Alternative 1: (Recommended)备选方案1:(推荐) You can rewrite the operation into an aggregation pipeline without translating the map-reduce function to equivalent pipeline stages:您可以将操作重写为聚合管道,而无需将map reduce函数转换为等效的管道阶段:
db.orders.aggregate([
{ $group: { _id: "$cust_id", value: { $sum: "$price" } } },
{ $out: "agg_alternative_1" }
])
Alternative 2: (For illustrative purposes only)备选方案2:(仅供说明) The following aggregation pipeline provides a translation of the various map-reduce functions, using 以下聚合管道提供了各种map reduce函数的转换,使用$accumulator to define custom functions:$accumulator定义自定义函数:
db.orders.aggregate( [
{ $project: { emit: { key: "$cust_id", value: "$price" } } }, // equivalent to the map function
{ $group: {
// equivalent to the reduce function
_id: "$emit.key",
valuesPrices: { $accumulator: {
init: function() { return 0; },
initArgs: [],
accumulate: function(state, value) { return state + value; },
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) { return state1 + state2; },
lang: "js"
} }
} },
{ $out: "agg_alternative_2" }
] )
First, the $project stage outputs documents with an emit field. The emit field is a document with the fields:
key that contains the cust_id value for the documentvalue that contains the price value for the document{ "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } }
{ "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } }
{ "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } }
{ "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } }
{ "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } }
{ "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } }
{ "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } }
{ "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } }
{ "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } }
{ "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } }
Then, the $group uses the $accumulator operator to add the emitted values:
{ "_id" : "Don Quis", "valuesPrices" : 155 }
{ "_id" : "Cam Elot", "valuesPrices" : 60 }
{ "_id" : "Ant O. Knee", "valuesPrices" : 95 }
{ "_id" : "Busby Bee", "valuesPrices" : 125 }
$out writes the output to the collection agg_alternative_2. Alternatively, you could use $merge instead of $out.The following map-reduce operation on the 下面的map-reduce操作通过orders collection groups by the item.sku field and calculates the number of orders and the total quantity ordered for each sku. item.sku字段对orders集合组执行,并计算每个sku的订单数量和订购总量。The operation then calculates the average quantity per order for each sku value and merges the results into the output collection.然后,该操作计算每个sku值的每个订单的平均数量,并将结果合并到输出集合中。
var mapFunction2 = function() { for (var idx = 0; idx < this.items.length; idx++) { var key = this.items[idx].sku; var value = { count: 1, qty: this.items[idx].qty }; emit(key, value); } }; var reduceFunction2 = function(keySKU, countObjVals) { reducedVal = { count: 0, qty: 0 }; for (var idx = 0; idx < countObjVals.length; idx++) { reducedVal.count += countObjVals[idx].count; reducedVal.qty += countObjVals[idx].qty; } return reducedVal; }; var finalizeFunction2 = function (key, reducedVal) { reducedVal.avg = reducedVal.qty/reducedVal.count; return reducedVal; }; db.orders.mapReduce( mapFunction2, reduceFunction2, { out: { merge: "map_reduce_example2" }, query: { ord_date: { $gte: new Date("2020-03-01") } }, finalize: finalizeFunction2 } );
Alternative 1: (Recommended)备选方案1:(推荐) You can rewrite the operation into an aggregation pipeline without translating the map-reduce function to equivalent pipeline stages:您可以将操作重写为聚合管道,而无需将map reduce函数转换为等效的管道阶段:
db.orders.aggregate( [
{ $match: { ord_date: { $gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
{ $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
{ $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
Alternative 2: (For illustrative purposes only)备选方案2:(仅供说明) The following aggregation pipeline provides a translation of the various map-reduce functions, using 以下聚合管道提供了各种map reduce函数的转换,使用$accumulator to define custom functions:$accumulator定义自定义函数:
db.orders.aggregate( [
{ $match: { ord_date: {$gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } },
{ $group: {
_id: "$emit.key",
value: { $accumulator: {
init: function() { return { count: 0, qty: 0 }; },
initArgs: [],
accumulate: function(state, value) {
state.count += value.count;
state.qty += value.qty;
return state;
},
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) {
return { count: state1.count + state2.count, qty: state1.qty + state2.qty };
},
finalize: function(state) {
state.avg = state.qty / state.count;
return state;
},
lang: "js"}
}
} },
{ $merge: {
into: "agg_alternative_4",
on: "_id",
whenMatched: "replace",
whenNotMatched: "insert"
} }
] )
$match stage selects only those documents with ord_date greater than or equal to new Date("2020-03-01").The $unwind stage breaks down the document by the items array field to output a document for each array element. For example:
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
...
The $project stage outputs documents with an emit field. The emit field is a document with the fields:
key that contains the items.sku valuevalue that contains a document with the qty value and a count value{ "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } }
{ "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
{ "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
{ "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
...
The $group uses the $accumulator operator to add the emitted count and qty and calculate the avg field:$group使用$accumulator运算符添加排放计数count和数量qty,并计算平均值avg字段:
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
$merge writes the output to the collection agg_alternative_4. $merge将输出写入集合agg_alternative_4。_id as the new result, the operation overwrites the existing document. _id与新结果相同,则该操作将覆盖现有文档。