On this page本页内容
This page describes the 本页介绍$merge stage, which outputs the aggregation pipeline results to a collection. $merge阶段,该阶段将聚合管道结果输出到集合。For the 有关将文档合并为单个文档的$mergeObjects operator, which merges documents into a single document, see $mergeObjects.$mergeObjects运算符,请参阅$mergeObjects。
$merge New in version 4.2.在版本4.2中新增。
Writes the results of the aggregation pipeline to a specified collection. 将聚合管道的结果写入指定集合。The $merge operator must be the last stage in the pipeline.$merge运算符必须是管道中的最后一个阶段。
The $merge stage:$merge阶段:
Starting in MongoDB 4.4:从MongoDB 4.4开始:
$mergePipelines with the 如果集群中的所有节点都将$merge stage can run on replica set secondary nodes if all the nodes in cluster have featureCompatibilityVersion set to 4.4 or higher and the Read Preference allows secondary reads.featureCompatibilityVersion设置为4.4或更高,并且读取首选项允许辅助读取,则带有$merge阶段的管道可以在副本集辅助节点上运行。
$merge statement are sent to secondary nodes, while the write operations occur only on the primary node.$merge语句的读操作被发送到辅助节点,而写操作只发生在主节点上。$merge operations to replica set secondary nodes. $merge操作定向到副本集辅助节点。$merge read operations running on secondary nodes.$merge读取操作的支持。For a comparison with the 有关与也将聚合结果输出到集合的$out stage which also outputs the aggregation results to a collection, see $merge and $out Comparison.$out阶段的比较,请参阅$merge和$out比较。
$merge can incorporate the pipeline results into an existing output collection rather than perform a full replacement of the collection. 可以将管道结果合并到现有的输出集合中,而不是执行集合的完全替换。This functionality allows users to create on-demand materialized views, where the content of the output collection is incrementally updated when the pipeline is run.此功能允许用户创建按需的物化视图,其中输出集合的内容在管道运行时以增量方式更新。
For more information on this use case, see On-Demand Materialized Views as well as the examples on this page.有关此用例的更多信息,请参阅按需物化视图以及本页上的示例。
Materialized views are separate from read-only views. 物化视图与只读视图是分开的。For information on creating read-only views, see read-only views.有关创建只读视图的信息,请参阅只读视图。
$merge has the following syntax:具有以下语法:
{ $merge: {
into: <collection> -or- { db: <db>, coll: <collection> },
on: <identifier field> -or- [ <identifier field1>, ...], // Optional
let: <variables>,
// Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail>
// Optional
} }
For example:例如:
{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
If using all default options for 如果使用$merge, including writing to a collection in the same database, you can use the simplified form:$merge的所有默认选项,包括写入同一数据库中的集合,则可以使用简化形式:
{ $merge: <collection> } // Output collection is in the same database
The $merge takes a document with the following fields:$merge获取具有以下字段的文档:
into |
| ||||||||||
on |
| ||||||||||
whenMatched |
| ||||||||||
let |
{ <variable_name_1>: <expression_1>,
...,
<variable_name_n>: <expression_n> }
| ||||||||||
whenNotMatched |
|
_id Field Generation_id字段生成If the 如果聚合管道结果的文档中不存在_id field is not present in a document from the aggregation pipeline results, the $merge stage generates it automatically._id字段,$merge阶段将自动生成该字段。
For example, in the following aggregation pipeline, 例如,在以下聚合管道中,$project excludes the _id field from the documents passed into $merge. $project从传递到$merge的文档中排除_id字段。When 当$merge writes these documents to the "newCollection", $merge generates a new _id field and value.$merge将这些文档写入"newCollection"时,$merge将生成一个新的_id字段和值。
db.sales.aggregate( [
{ $project: { _id: 0 } },
{ $merge : { into : "newCollection" } }
] )
The 如果指定的输出集合不存在,$merge operation creates a new collection if the specified output collection does not exist.$merge操作将创建新集合。
$merge writes the first document into the collection and is immediately visible.$merge将第一个文档写入集合时,将创建输出集合,并立即可见。$merge before the error will not be rolled back.$merge完成的任何写入。For a replica set or a standalone, if the output database does not exist, 对于副本集或独立数据库,如果输出数据库不存在,$merge also creates the database.$merge也会创建数据库。
For a sharded cluster, the specified output database must already exist.对于分片集群,指定的输出数据库必须已经存在。
If the output collection does not exist, 如果输出集合不存在,$merge requires the on identifier to be the _id field. $merge要求on标识符为_id字段。To use a different 要为不存在的集合使用不同的on field value for a collection that does not exist, you can create the collection first by creating a unique index on the desired field(s) first. on字段值,可以先在所需字段上创建唯一索引,然后创建集合。For example, if the output collection 例如,如果输出集合newDailySales201905 does not exist and you want to specify the salesDate field as the on identifier:newDailySales201905不存在,并且您希望将salesDate字段指定为on标识符:
db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } )
db.sales.aggregate( [
{ $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } },
{ $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } },
{ $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } },
{ $merge : { into : "newDailySales201905", on: "salesDate" } }
] )
The $merge stage can output to a sharded collection. $merge阶段可以输出到分片集合。When the output collection is sharded, 当输出集合被分片时,$merge uses the _id field and all the shard key fields as the default on identifier. $merge使用_id字段和所有分片键字段作为默认的on标识符。If you override the default, the on identifier must include all the shard key fields:如果覆盖默认值,则on标识符必须包括所有分片键字段:
{ $merge: {
into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields
let: <variables>,
// Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail>
// Optional
} }
For example, in a database that has 例如,在已启用分片的数据库中,使用sharding enabled, use the sh.shardCollection() method to create a new sharded collection newrestaurants with the postcode field as the shard key.sh.shardCollection()方法创建一个新的分片集合newrestaurants,其中postcode字段作为分片键。
sh.enableSharding("exampledb"); // Sharding must be enabled in the database sh.shardCollection( "exampledb.newrestaurants", // Namespace of the collection to shard { postcode: 1 }, // Shard key );
The newrestaurants collection will contain documents with information on new restaurant openings by month (date field) and postcode (shard key); specifically, the on identifier is ["date", "postcode"] (the ordering of the fields does not matter). newrestaurants集合将包含关于每月新餐厅开业信息的文档(date字段)和邮政编码(分片键);具体而言,on标识符为["date", "postcode"](字段的顺序无关紧要)。Because 由于$merge requires a unique index with keys that correspond to the on identifier fields, create the unique index (the ordering of the fields do not matter): $merge需要一个唯一索引,其键对应于on标识符字段,因此创建唯一索引(字段的顺序无关紧要):[1]
use exampledb
db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )
With the sharded collection 使用分片的集合restaurants and the unique index created, you can use $merge to output the aggregation results to this collection, matching on [ "date", "postcode" ] as in this example:restaurants和创建的唯一索引,您可以使用$merge将聚合结果输出到此集合,并在[ "date", "postcode" ]匹配,如下例所示:
use exampledb
db.openings.aggregate([
{ $group: {
_id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" },
restaurants: { $push: "$restaurantName" } } },
{ $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } },
{ $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } }
])
| [1] | sh.shardCollection() method can also create a unique index on the shard key when passed the { unique: true } option if: the shard key is range-based, the collection is empty, and a unique index on the shard key doesn't already exist.{ unique: true }选项时,sh.shardCollection()方法还可以在分片键上创建唯一索引,如果:分片键基于范围,集合为空,并且分片键上的唯一索引不存在。on identifier is the shard key and another field, a separate operation to create the corresponding index is required.on标识符是分片键和另一个字段,所以需要单独的操作来创建相应的索引。 |
$merge) vs Replace Collection ($out)$merge)vs替换集合($out)$merge can replace an existing document in the output collection if the aggregation results contain a document or documents that match based on the on specification. 如果聚合结果包含基于on规范匹配的一个或多个文档,则可以替换输出集合中的现有文档。As such, 因此,如果聚合结果包括集合中所有现有文档的匹配文档,并且您为$merge can replace all documents in the existing collection if the aggregation results include matching documents for all existing documents in the collection and you specify "replace" for whenMatched.whenMatched指定了"replace",则$merge可以替换现有集合中的所有文档。
However, to replace an existing collection regardless of the aggregation results, use 但是,要替换现有集合而不管聚合结果如何,请改用$out instead.$out。
_id and Shard Key Values_id和Shard键值The 如果$merge errors if the $merge results in a change to an existing document's _id value.$merge导致现有文档的_id值发生更改,则$merge错误。
To avoid this error, if the on field does not include the 为了避免此错误,如果_id field, remove the _id field in the aggregation results to avoid the error, such as with a preceding $unset stage, and so on.on字段不包括_id字段,请在聚合结果中删除_id字段以避免错误,例如使用前面的$unset阶段,等等。
Additionally, for a sharded collection, 此外,对于分片集合,$merge also generates an error if it results in a change to the shard key value of an exising document.$merge如果导致现有文档的分片键值发生更改,也会生成错误。
Any writes completed by the $merge before the error will not be rolled back.$merge在错误之前完成的任何写入都不会回滚。
If the unique index used by 如果在聚合过程中删除了$merge for on field(s) is dropped mid-aggregation, there is no guarantee that the aggregation will be killed. on字段上的$merge使用的唯一索引,则无法保证聚合将被终止。If the aggregation continues, there is no guarantee that documents do not have duplicate 如果聚合继续,则无法保证文档不会有重复的on field values.on字段值。
If the 如果$merge attempts to write a document that violates any unique index on the output collection, the operation generates an error. $merge试图写入违反输出集合上任何唯一索引的文档,则该操作将生成错误。For example:例如:
on字段上的索引之外的唯一索引。on上的索引。whenMatchedStarting in MongoDB 4.2.2, if all of the following are true for a 从MongoDB 4.2.2开始,如果$merge stage:$merge阶段的以下所有条件均为真:
whenMatched的值是聚合管道,insert, andwhenNotMatched的值为insert,并且$merge inserts the document directly into the output collection.将文档直接插入到输出集合中。
Prior to MongoDB 4.2.2, when these conditions for a 在MongoDB 4.2.2之前,当满足$merge stage are met, the pipeline specified in the whenMatched field is executed with an empty input document. $merge阶段的这些条件时,将使用空输入文档执行whenMatched字段中指定的管道。The resulting document from the pipeline is inserted into the output collection.来自管道的结果文档被插入到输出集合中。
$merge and $out Comparison$merge和$out比较With the introduction of 随着$merge, MongoDB provides two stages, $merge and $out, for writing the results of the aggregation pipeline to a collection:$merge的引入,MongoDB提供了两个阶段$merge和$out,用于将聚合管道的结果写入集合:
$merge | $out |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
When 当$merge outputs to the same collection that is being aggregated, documents may get updated multiple times or the operation may result in an infinite loop. $merge输出到正在聚合的同一集合时,文档可能会被多次更新,或者操作可能导致无限循环。This behavior occurs when the update performed by 当$merge changes the physical location of documents stored on disk. $merge执行的更新更改存储在磁盘上的文档的物理位置时,会发生此行为。When the physical location of a document changes, 当文档的物理位置更改时,$merge may view it as an entirely new document, resulting in additional updates. $merge可能会将其视为一个全新的文档,从而导致其他更新。For more information on this behavior, see Halloween Problem.有关此行为的详细信息,请参阅万圣节问题。
Starting in MongoDB 4.4, 从MongoDB 4.4开始,$merge can output to the same collection that is being aggregated. $merge可以输出到正在聚合的相同集合。You can also output to a collection which appears in other stages of the pipeline, such as 您还可以输出到出现在管道其他阶段的集合,例如$lookup.$lookup。
Versions of MongoDB prior to 4.4 did not allow MongoDB 4.4之前的版本不允许$merge to output to the same collection as the collection being aggregated.$merge输出到与聚合的集合相同的集合。
$merge inside a transaction.$merge。 | |
$merge to output to a time series collection.$merge输出到时间序列集合。 | |
$merge stage. $merge阶段。$facet stage), this $merge stage restriction applies to the nested pipelines as well.$facet阶段),则$merge阶段限制也适用于嵌套管道。 | |
$lookup | $lookup stage's nested pipeline cannot include the $merge stage.$lookup阶段的嵌套管道不能包含$merge阶段。 |
$facet | $facet stage's nested pipeline cannot include the $merge stage.$facet阶段的嵌套管道不能包含$merge阶段。 |
$unionWith | $unionWith stage's nested pipeline cannot include the $merge stage.$unionWith阶段的嵌套管道不能包含$merge阶段。 |
"linearizable" |
|
If the output collection does not exist, the 如果输出集合不存在,$merge creates the collection.$merge将创建集合。
For example, a collection named 例如,salaries in the zoo database is populated with the employee salary and department history:zoo数据库中名为salaries的集合将填充员工工资和部门历史记录:
db.getSiblingDB("zoo").salaries.insertMany([ { "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 }, { "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 }, { "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 }, { "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 }, { "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 }, { "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 }, { "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 }, { "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 }, { "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 }, { "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 } ])
You can use the 您可以使用$group and $merge stages to initially create a collection named budgets (in the reporting database) from the data currently in the salaries collection:$group和$merge阶段从当前salaries集合中的数据创建一个名为budgets的集合(在reporting数据库中):
For a replica set or a standalone deployment, if the output database does not exist, 对于副本集或独立部署,如果输出数据库不存在,$merge also creates the database.$merge还会创建数据库。
For a sharded cluster deployment, the specified output database must already exist.对于分片集群部署,指定的输出数据库必须已经存在。
db.getSiblingDB("zoo").salaries.aggregate( [ { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } }, { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
$groupfiscal_year and dept.fiscal_year(财政年度)和dept(部门)对工资进行分组的阶段。$merge$group stage to the budgets collection in the reporting database.$group阶段的输出写入报告数据库中的budgets(预算)集合。To view the documents in the new 要查看新budgets collection:budgets集合中的文档:
db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
The budgets collection contains the following documents:budgets集合包含以下文档:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
The following example uses the collections in the previous example.以下示例使用上一示例中的集合。
The example 示例salaries collection contains the employee salary and department history:salaries集合包含员工薪资和部门历史记录:
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
The example 示例budgets collection contains the cumulative yearly budgets:budgets集合包含累积年度预算:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
During the current fiscal year (在当前财政年度(本例中为2019 in this example), new employees are added to the salaries collection and new head counts are pre-allocated for the next year:2019年),新员工被添加到salaries集合中,并为下一年预先分配新的人数:
db.getSiblingDB("zoo").salaries.insertMany([ { "_id" : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 }, { "_id" : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 }, { "_id" : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 }, { "_id" : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 } ])
To update the 要更新budgets collection to reflect the new salary information, the following aggregation pipeline uses:salaries集合以反映新的薪资信息,以下聚合管道使用:
$matchfiscal_year greater than or equal to 2019.fiscal_year大于或等于2019的所有文档。$groupfiscal_year and dept.fiscal_year(财政年度)和dept(部门)对工资进行分组的阶段。$merge to write the result set to the budgets collection, replacing documents with the same _id value (in this example, a document with the fiscal year and dept). $merge将结果集写入budgets集合,替换具有相同_id值的文档(在本例中,是具有财政年度和部门的文档)。$merge inserts the new documents.$merge将插入新文档。db.getSiblingDB("zoo").salaries.aggregate( [ { $match : { fiscal_year: { $gte : 2019 } } }, { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } }, { $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
After the aggregation is run, view the documents in the 运行聚合后,查看budgets collection:budgets集合中的文档:
db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )
The budgets collection incorporates the new salary data for fiscal year 2019 and adds new documents for fiscal year 2020:budgets集合包含了2019财年的新薪资数据,并添加了2020财年的新文档:
{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
{ "_id" : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }
To ensure that the 要确保$merge does not overwrite existing data in the collection, set whenMatched to keepExisting or fail.$merge不会覆盖集合中的现有数据,请将whenMatched设置为keepExisting或fail。
The example salaries collection in the zoo database contains the employee salary and department history:zoo数据库中的salaries薪资集合包含员工薪资和部门历史记录:
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
A collection orgArchive in the reporting database contains historical departmental organization records for the past fiscal years. reporting数据库中的集合orgArchive包含过去财政年度的历史部门组织记录。Archived records should not be modified.存档记录不应修改。
{ "_id" : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }
The orgArchive collection has a unique compound index on the fiscal_year and dept fields. orgArchive集合在fiscal_year和dept字段上具有唯一的复合索引。Specifically, there should be at most one record for the same fiscal year and department combination:具体而言,同一财政年度和部门组合最多应有一个记录:
db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )
At the end of current fiscal year (在当前财政年度结束时(本例中为2019 in this example), the salaries collection contain the following documents:2019年),salaries集合包含以下文件:
{ "_id" : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 }
{ "_id" : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 }
{ "_id" : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 }
{ "_id" : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 }
{ "_id" : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 }
{ "_id" : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 }
{ "_id" : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 }
{ "_id" : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 }
{ "_id" : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 }
{ "_id" : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 }
{ "_id" : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 }
{ "_id" : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
{ "_id" : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
To update the 要更新orgArchive collection to include the fiscal year 2019 that has just ended, the following aggregation pipeline uses:orgArchive集合以包括刚刚结束的2019财年,以下聚合管道使用:
$matchfiscal_year equal to 2019.fiscal_year等于2019年的所有文档。$groupfiscal_year and dept.fiscal_year和dept对员工进行分组的阶段。$project_id field and add separate dept and fiscal_year field. _id字段,并添加单独的dept和fiscal_year字段。$merge, $merge automatically generates a new _id field for the documents.$merge时,$merge会自动为文档生成一个新的_id字段。$merge to write the result set to 将结果集写入orgArchive.orgArchive。
The $merge stage matches documents on the 阶段匹配dept and fiscal_year fields and fails when matched. dept和fiscal_year字段中的文档,匹配失败。That is, if a document already exists for the same department and fiscal year, the 也就是说,如果同一部门和财政年度的文档已经存在,则$merge errors.$merge错误。
db.getSiblingDB("zoo").salaries.aggregate( [ { $match: { fiscal_year: 2019 }}, { $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } }, { $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } }, { $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } } ] )
After the operation, the 操作后,orgArchive collection contains the following documents:orgArchive集合包含以下文档:
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }
If the 如果orgArchive collection already contained a document for 2019 for department "A" and/or "B", the aggregation fails because of the duplicate key error. orgArchive集合已包含部门"A"和/或"B"的2019年文档,则由于重复密钥错误,聚合失败。However, any document inserted before the error will not be rolled back.但是,错误之前插入的任何文档都不会回滚。
If you specify keepExisting for the matching document, the aggregation does not affect the matching document and does not error with duplicate key error. 如果为匹配文档指定keepExisting,则聚合不会影响匹配文档,并且不会出现重复键错误。Similarly, if you specify replace, the operation would not fail; however, the operation would replace the existing document.类似地,如果指定替换,则操作不会失败;但是,该行动将取代现有文件。
By default, if a document in the aggregation results matches a document in the collection, the 默认情况下,如果聚合结果中的文档与集合中的文档匹配,$merge stage merges the documents.$merge阶段将合并这些文档。
An example collection 示例集合purchaseorders is populated with the purchase order information by quarter and regions:purchaseorders按季度和地区填充采购订单信息:
db.purchaseorders.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
{ _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
{ _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
] )
Another example collection 另一个示例集合reportedsales is populated with the reported sales information by quarter and regions:reportedsales按季度和地区填充报告的销售信息:
db.reportedsales.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
] )
Assume that, for reporting purposes, you want to view the data by quarter in the following format:假设出于报告目的,您希望按季度以以下格式查看数据:
{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
You can use the 您可以使用$merge to merge in results from the purchaseorders collection and the reportedsales collection to create a new collection quarterlyreport.$merge合并purchaseorders集合和reportedsales集合的结果,以创建新的集合quarterlyreport。
To create the 要创建quarterlyreport collection, you can use the following pipeline:quarterlyreport集合,可以使用以下管道:
db.purchaseorders.aggregate( [
{ $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
The $group stage groups by the quarter and uses $sum to add the qty fields into a new purchased field. $group阶段按季度分组,并使用$sum将qty字段添加到新的purchased(采购)字段中。For example:例如:
To create the 要创建quarterlyreport collection, you can use this pipeline:quarterlyreport集合,可以使用以下管道:
{ "_id" : "2019Q2", "purchased" : 1700 }
{ "_id" : "2019Q1", "purchased" : 1200 }
$merge stage writes the documents to the quarterlyreport collection in the same database. $merge阶段将文档写入同一数据库中的quarterlyreport集合。_id field, the stage merges the matching documents. _id字段匹配的现有文档,则阶段将合并匹配文档。To view the documents in the collection, run the following operation:要查看集合中的文档,请运行以下操作:
db.quarterlyreport.find().sort( { _id: 1 } )
The collection contains the following documents:该集合包含以下文档:
{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }
Similarly, run the following aggregation pipeline against the 类似地,对reportedsales collection to merge the sales results into the quarterlyreport collection.reportedsales集合运行以下聚合管道,以将销售结果合并到quarterlyreport集合中。
db.reportedsales.aggregate( [
{ $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
The $group stage groups by the quarter and uses $sum to add the qty fields into a new sales field. $group阶段按季度分组,并使用$sum将qty字段添加到新的sales字段中。For example:例如:
{ "_id" : "2019Q2", "sales" : 500 }
{ "_id" : "2019Q1", "sales" : 1950 }
$merge stage writes the documents to the quarterlyreport collection in the same database. $merge阶段将文档写入同一数据库中的quarterlyreport集合。_id field (the quarter), the stage merges the matching documents. _id字段(季度)匹配的现有文档,则阶段合并匹配的文档。To view the documents in the 要在合并数据后查看quarterlyreport collection after the data has been merged, run the following operation:quarterlyreport集合中的文档,请运行以下操作:
db.quarterlyreport.find().sort( { _id: 1 } )
The collection contains the following documents:该集合包含以下文档:
{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }
The 当文档匹配时,$merge can use a custom update pipeline when documents match. $merge可以使用自定义更新管道。The whenMatched pipeline can have the following stages:whenMatched管道可以具有以下阶段:
$addFields$set$project$unset$replaceRoot$replaceWithAn example collection 使用每日投票记录填充示例votes is populated with the daily vote tally. votes集合。Create the collection with the following documents:使用以下文档创建集合:
db.votes.insertMany( [
{ date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
{ date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
{ date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
{ date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
] )
Another example collection 另一个示例集合monthlytotals has the up-to-date monthly vote totals. monthlytotals具有最新的每月投票总数。Create the collection with the following document:使用以下文档创建集合:
db.monthlytotals.insertOne(
{ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 }
)
At the end of each day, that day's votes is inserted into the 每天结束时,将当天的投票插入votes collection:votes集合:
db.votes.insertOne(
{ date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
)
You can use 您可以使用$merge with an custom pipeline to update the existing document in the collection monthlytotals:$merge和自定义管道来更新集合monthlytotals中的现有文档:
db.votes.aggregate([
{ $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
{ $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
{ $merge: {
into: "monthlytotals",
on: "_id",
whenMatched: [
{ $addFields: {
thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
} } ],
whenNotMatched: "insert"
} }
])
The $match stage finds the specific day's votes. $match阶段查找特定日期的投票。For example:例如:
{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
The $project stage sets the _id field to a year-month string. For example:$project阶段将_id字段设置为年-月字符串。例如:
{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
The $merge stage writes the documents to the monthlytotals collection in the same database. $merge阶段将文档写入同一数据库中的monthlytotals集合。If the stage finds an existing document in the collection that matches on the 如果阶段在集合中找到与_id field, the stage uses a pipeline to add the thumbsup votes and the thumbsdown votes._id字段匹配的现有文档,则阶段将使用管道添加thumbsup投票和thumbsdown投票。
thumbsup field and the thumbsdown field in the results document, the pipeline uses the $$new variable; i.e. $$new.thumbsup and $new.thumbsdown.thumbsup字段和thumbsdown字段,管道使用$$new变量;即$$new.thumbsup和$new.thumpsdown。thumbsup field and the thumbsdown field in the existing document in the collection; i.e. $thumbsup and $thumbsdown.thumbsup字段和thumbsdown字段;即$thumbsup和$thumpsdown。The resulting document replaces the existing document.生成的文档将替换现有文档。
To view documents in the 要在合并操作后查看monthlytotals collection after the merge operation, run the following operation:monthlytotals集合中的文档,请运行以下操作:
db.monthlytotals.find()
The collection contains the following document:集合包含以下文档:
{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }
You can use variables in the 您可以在$merge stage whenMatched field. $merge阶段whenMatched字段中使用变量。Variables must be defined before they can be used.变量必须先定义,然后才能使用。
Define variables in one or both of the following:在以下一项或两项中定义变量:
To use variables in whenMatched:要在whenMatched中使用变量:
Specify the double dollar sign ($$) prefix together with the variable name in the form 以$$<variable_name>. $$<variable_name>的形式指定双美元符号($$)前缀和变量名。For example, 例如,$$year. $$year。If the variable is set to a document, you can also include a document field in the form 如果变量设置为文档,则还可以以$$<variable_name>.<field>. $$<variable_name>.<field>的形式包含文档字段。For example, 例如,$$year.month.$$year.month。
The tabs below demonstrate behavior when variables are defined in the merge stage, the aggregate command, or both.下面的选项卡演示了在合并阶段、聚合命令或两者中定义变量时的行为。
You can define variables in the 您可以在$merge stage let and use the variables in the whenMatched field.$merge阶段let中定义变量,并在whenMatched字段中使用变量。
Example:例子:
db.cakeSales.insertOne( [ { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ] ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), let : { year: "2020" }, whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {} } ) db.cakeSales.find()
The example:该示例:
creates a collection named 创建名为cakeSalescakeSales的集合
runs an 运行aggregate command that defines a year variable in the $merge let and adds the year to cakeSales using whenMatchedaggregate命令,该命令在$merge let中定义year变量,并使用whenMatched将年份添加到cakeSales
retrieves the 检索cakeSales documentcakeSales文档
Output:输出:
{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }New in version 5.0.
You can define variables in the 您可以在aggregate command let and use the variables in the $merge stage whenMatched field.aggregate命令let中定义变量,并在$merge阶段whenMatched字段中使用变量。
Example:示例:
db.cakeSales.insertOne( { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {}, let : { year: "2020" } } ) db.cakeSales.find()
The example:该示例:
creates a collection named 创建名为cakeSalescakeSales的集合
runs an 运行aggregate command that defines a year variable in the aggregate command let and adds the year to cakeSales using whenMatchedaggregate命令,该命令在aggregate命令let中定义year变量,并使用whenMatched将年份添加到cakeSales
retrieves the 检索cakeSales documentcakeSales文档
Output:输出:
{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }You can define variables in the 您可以在$merge stage and, starting in MongoDB 5.0, the aggregate command.$merge阶段定义变量,并从MongoDB 5.0开始定义aggregate命令。
If two variables with the same name are defined in the 如果$merge stage and the aggregate command, the $merge stage variable is used.$merge阶段和aggregate命令中定义了两个同名的变量,则使用$merge阶段变量。
In this example, the 在此示例中,使用year: "2020" $merge stage variable is used instead of the year: "2019" aggregate command variable:year: "2020" $merge阶段变量,而不是year: "2019"aggregate命令变量:
db.cakeSales.insertOne( { _id: 1, flavor: "chocolate", salesTotal: 1580, salesTrend: "up" } ) db.runCommand( { aggregate: db.cakeSales.getName(), pipeline: [ { $merge: { into: db.cakeSales.getName(), let : { year: "2020" }, whenMatched: [ { $addFields: { "salesYear": "$$year" } } ] } } ], cursor: {}, let : { year: "2019" } } ) db.cakeSales.find()
Output:
{
_id: 1,
flavor: 'chocolate',
salesTotal: 1580,
salesTrend: 'up',
salesYear: '2020'
}