Database Manual / Reference / Query Language / Aggregation Stages

$merge (aggregation stage)(聚合阶段)

Definition定义

Note

This page describes the $merge stage, which outputs the aggregation pipeline results to a collection. 此页面描述了$merge阶段,该阶段将聚合管道结果输出到集合。For the $mergeObjects operator, which merges documents into a single document, see $mergeObjects.有关将文档合并为单个文档的$mergeObjects运算符,请参阅$mergeObject

$merge

Writes the results of the aggregation pipeline to a specified collection. The $merge operator must be the last stage in the pipeline.聚合管道的结果写入指定的集合。$merge运算符必须是管道中的最后一个阶段。

The $merge stage:$merge阶段:

  • Can output to a collection in the same or different database.可以输出到相同或不同数据库中的集合。
  • Can output to the same collection that is being aggregated. For more information, see Output to the Same Collection that is Being Aggregated.可以输出到正在聚合的同一集合。有关更多信息,请参阅输出到正在聚合的同一集合
  • Consider the following points when using $merge or $out stages in an aggregation pipeline:在聚合管道中使用$merge$out阶段时,请考虑以下几点:

    • Starting in MongoDB 5.0, pipelines with a $merge stage can run on replica set secondary nodes if all the nodes in the cluster have the featureCompatibilityVersion set to 5.0 or higher and the read preference allows secondary reads.从MongoDB 5.0开始,如果集群中的所有节点都将featureCompatibilityVersion设置为5.0或更高,并且读取首选项允许二次读取,则具有$merge阶段的管道可以在副本集secondary节点上运行。

      • $merge and $out stages run on secondary nodes, but write operations are sent to the primary node.$merge$out阶段在辅助节点上运行,但写操作被发送到primary节点。
      • Not all driver versions support $merge operations sent to the secondary nodes. For details, see the driver documentation.并非所有驱动程序版本都支持发送到辅助节点的$merge操作。有关详细信息,请参阅驱动程序文档。
    • In earlier MongoDB versions, pipelines with $out or $merge stages always run on the primary node and read preference isn't considered.在早期的MongoDB版本中,具有$out$merge阶段的管道始终在主节点上运行,并且不考虑读取偏好。
  • Creates a new collection if the output collection does not already exist.如果输出集合不存在,则创建新集合。
  • Can incorporate results (insert new documents, merge documents, replace documents, keep existing documents, fail the operation, process documents with a custom update pipeline) into an existing collection.可以将结果(插入新文档、合并文档、替换文档、保留现有文档、操作失败、使用自定义更新管道处理文档)合并到现有集合中。
  • Can output to a sharded collection. Input collection can also be sharded.可以输出到分片集合。输入集合也可以分片。

For a comparison with the $out stage which also outputs the aggregation results to a collection, see $merge and $out Comparison.有关与也将聚合结果输出到集合的$out阶段的比较,请参阅$merge$out的比较

Note

On-Demand Materialized Views按需物化视图

$merge can incorporate the pipeline results into an existing output collection rather than perform a full replacement of the collection. 可以将管道结果合并到现有的输出集合中,而不是完全替换该集合。This functionality allows users to create on-demand materialized views, where the content of the output collection is incrementally updated when the pipeline is run.此功能允许用户创建按需的物化视图,当管道运行时,输出集合的内容会增量更新。

For more information on this use case, see On-Demand Materialized Views as well as the examples on this page.有关此用例的更多信息,请参阅按需物化视图以及本页上的示例

Materialized views are separate from read-only views. For information on creating read-only views, see read-only views.物化视图与只读视图是分开的。有关创建只读视图的信息,请参阅只读视图

Compatibility兼容性

You can use $merge for deployments hosted in the following environments:您可以将$merge用于在以下环境中托管的部署:

  • MongoDB Atlas: The fully managed service for MongoDB deployments in the cloud:云中MongoDB部署的完全托管服务
  • MongoDB Enterprise: The subscription-based, self-managed version of MongoDB:MongoDB的基于订阅的自我管理版本
  • MongoDB Community: The source-available, free-to-use, and self-managed version of MongoDB:MongoDB的源代码可用、免费使用和自我管理版本

Syntax语法

$merge has the following syntax:具有以下语法:

{ $merge: {
into: <collection> -or- { db: <db>, coll: <collection> },
on: <identifier field> -or- [ <identifier field1>, ...], // Optional
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

For example:例如:

{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }

If using all default options for $merge, including writing to a collection in the same database, you can use the simplified form:如果使用$merge的所有默认选项,包括写入同一数据库中的集合,则可以使用简化形式:

{ $merge: <collection> } // Output collection is in the same database

The $merge takes a document with the following fields:$merge接受具有以下字段的文档:

Field字段Description描述
into

The output collection. Specify either:输出集合。指定以下任一项:

  • The collection name as a string to output to a collection in the same database where the aggregation is run. For example:将集合名称作为字符串输出到运行聚合的同一数据库中的集合。例如:

    into: "myOutput"

  • The database and collection name in a document to output to a collection in the specified database. For example:文档中的数据库和集合名称,以输出到指定数据库中的集合。例如:

    into: { db:"myDB", coll:"myOutput" }

If the output collection does not exist, $merge creates the collection:如果输出集合不存在,$merge将创建该集合:

  • For a replica set or a standalone, if the output database does not exist, $merge also creates the database.对于副本集或单机版副本,如果输出数据库不存在,$merge也会创建数据库。
  • For a sharded cluster, the specified output database must already exist.对于分片集群,指定的输出数据库必须已经存在。

The output collection can be a sharded collection.输出集合可以是分片集合。

on

Optional. 可选。Field or fields that act as a unique identifier for a document. The identifier determines if a results document matches an existing document in the output collection. Specify either:用作文档唯一标识符的一个或多个字段。标识符确定结果文档是否与输出集合中的现有文档匹配。指定以下任一项:

  • A single field name as a string. For example:字符串形式的单个字段名。例如:

    on: "_id"

  • A combination of fields in an array. For example:数组中的字段组合。例如:

    on: [ "date", "customerId" ]

    The order of the fields in the array does not matter, and you cannot specify the same field multiple times.数组中字段的顺序无关紧要,您不能多次指定同一字段。

For the specified field or fields:对于指定的一个或多个字段:

  • The aggregation results documents must contain the field(s) specified in the on, unless the on field is the _id field. If the _id field is missing from a results document, MongoDB adds it automatically.聚合结果文档必须包含on中指定的字段,除非on字段是_id字段。如果结果文档中缺少_id字段,MongoDB会自动添加它。
  • For deployments running MongoDB 8.0 and earlier, specified field or fields for on cannot be missing or contain a null value. 对于运行MongoDB 8.0及更早版本的部署,on的指定字段不能缺失或包含空值。Starting in MongoDB 8.1, if the supporting index is not sparse, the specified field or fields for on can be missing or contain a null value.从MongoDB 8.1开始,如果支持索引不是稀疏的,on的指定字段可能会丢失或包含空值。
  • The specified field or fields cannot contain an array value.指定的一个或多个字段不能包含数组值。

$merge requires a unique index with keys that correspond to the on identifier fields. $merge需要一个唯一的索引,其键与on标识符字段对应。Although the order of the index key specification does not matter, the unique index must only contain the on fields as its keys.虽然索引键规范的顺序无关紧要,但唯一索引必须只包含on字段作为其键。

  • The index must also have the same collation as the aggregation's collation.索引的排序规则也必须与聚合的排序规则相同。
  • The unique index can be a sparse index.唯一索引可以是稀疏索引。
  • The unique index cannot be a partial index.唯一索引不能是部分索引。
  • For output collections that already exist, the corresponding index must already exist.对于已经存在的输出集合,相应的索引必须已经存在。

The default value for on depends on the output collection:on的默认值取决于输出集合:

  • If the output collection does not exist, the on identifier must be and defaults to the _id field. The corresponding unique _id index is automatically created.如果输出集合不存在,则on标识符必须为,默认为_id字段。相应的唯一_id索引会自动创建。

    • To use a different on identifier field(s) for a collection that does not exist, you can create the collection first by creating a unique index on the desired field(s). 要为不存在的集合使用不同的on标识符字段,您可以首先通过在所需字段上创建唯一索引来创建集合。See the section on non-existent output collection for an example.请参阅关于不存在的输出集合的部分以获取示例。
  • If the existing output collection is unsharded, the on identifier defaults to the _id field.如果现有的输出集合未被记录,则on标识符默认为_id字段。
  • If the existing output collection is a sharded collection, the on identifier defaults to all the shard key fields and the _id field. 如果现有的输出集合是分片集合,则on标识符默认为所有分片键字段和_id字段。If specifying a different on identifier, the on must contain all the shard key fields.如果指定不同的on标识符,on必须包含所有分片键字段。
whenMatched

Optional. 可选。The behavior of $merge if a result document and an existing document in the collection have the same value for the specified on field(s).如果结果文档和集合中的现有文档在指定的on字段中具有相同的值,则$merge的行为。

You can specify either:您可以指定:

  • One of the pre-defined action strings:预定义的操作字符串之一:

    Action操作Description描述
    "replace"

    Replace the existing document in the output collection with the matching results document.用匹配的结果文档替换输出集合中的现有文档。

    When performing a replace, the replacement document cannot result in a modification of the _id value or, if the output collection is sharded, the shard key value. 执行替换时,替换文档不能导致_id值的修改,或者如果输出集合是分片的,则不能导致分片键值的修改。Otherwise, the operation generates an error.否则,操作将产生错误。

    To avoid this error, if the on field does not include the _id field, remove the _id field in the aggregation results to avoid the error, such as with a preceding $unset stage, and so on.为了避免此错误,如果on字段不包括_id字段,请在聚合结果中删除_id字段以避免错误,例如前面的$unset阶段,等等。

    "keepExisting"

    Keep the existing document in the output collection.将现有文档保留在输出集合中。

    "merge" (Default)(默认)

    Merge the matching documents (similar to the $mergeObjects operator).合并匹配的文档(类似于$mergeObjects运算符)。

    • If the results document contains fields not in the existing document, add these new fields to the existing document.如果结果文档包含现有文档中没有的字段,请将这些新字段添加到现有文档中。
    • If the results document contains fields in the existing document, replace the existing field values with those from the results document.如果结果文档包含现有文档中的字段,请将现有字段值替换为结果文档中的值。

    For example, if the output collection has the document:例如,如果输出集合有文档:

    { _id: 1, a: 1, b: 1 }

    And the aggregation results has the document:聚合结果有以下文档:

    { _id: 1, b: 5, z: 1 }

    Then, the merged document is:那么,合并后的文档是:

    { _id: 1, a: 1, b: 5, z: 1 }

    When performing a merge, the merged document cannot result in a modification of the _id value or, if the output collection is sharded, the shard key value. Otherwise, the operation generates an error.执行合并时,合并后的文档不能导致_id值的修改,或者如果输出集合是分片的,则不能导致分片键值的修改。否则,操作将产生错误。

    To avoid this error, if the on field does not include the _id field, remove the _id field in the aggregation results to avoid the error, such as with a preceding $unset stage, and so on.为了避免此错误,如果on字段不包括_id字段,请在聚合结果中删除_id字段以避免错误,例如前面的$unset阶段,等等。

    "fail"

    Stop and fail the aggregation operation. Any changes to the output collection from previous documents are not reverted.停止聚合操作并使其失败。对以前文档的输出集合所做的任何更改都不会恢复。

  • An aggregation pipeline to update the document in the collection.用于更新集合中文档的聚合管道。

    [ <stage1>, <stage2> ... ]

    The pipeline can only consist of the following stages:管道只能由以下阶段组成:

    The pipeline cannot modify the on field's value. For example, if you are matching on the field month, the pipeline cannot modify the month field.管道无法修改on字段的值。例如,如果您在字段month上进行匹配,则管道无法修改month字段。

    The whenMatched pipeline can directly access the fields of the existing documents in the output collection using $<field>.whenMatched管道可以使用$<field>直接访问输出集合中现有文档的字段。

    To access the fields from the aggregation results documents, use either:要访问聚合结果文档中的字段,请使用以下任一方法:

    • The built-in $$new variable to access the field. 访问字段的内置$$new变量。Specifically, $$new.<field>. 具体来说,$$new.<field>The $$new variable is only available if the let specification is omitted.$$new变量仅在省略let规范时可用。
    • The user-defined variables in the let field.let字段中的用户定义变量。

      Specify the double dollar sign ($$) prefix together with the variable name in the form $$<variable_name>. $$<variable_name>的形式指定双美元符号($$)前缀和变量名。For example, $$year. 例如,$$yearIf the variable is set to a document, you can also include a document field in the form $$<variable_name>.<field>. For example, $$year.month.如果变量设置为文档,您还可以以$$<variable_name>.<field>的形式包含文档字段。例如,$$year.month

      For more examples, see Use Variables to Customize the Merge.有关更多示例,请参阅使用变量自定义合并

let

Optional. 可选。Specifies variables for use in the whenMatched pipeline.指定在whenMatched管道中使用的变量。

Specify a document with the variable names and value expressions:使用变量名和值表达式指定文档:

{ <variable_name_1>: <expression_1>,
...,
<variable_name_n>: <expression_n> }

If unspecified, defaults to { new: "$$ROOT" } (see ROOT). 如果未指定,则默认为{ new: "$$ROOT" }(请参阅ROOT)。The whenMatched pipeline can access the $$new variable.whenMatched管道可以访问$$new变量。

To access the variables in the whenMatched pipeline:要访问whenMatched管道中的变量:

Specify the double dollar sign ($$) prefix together with the variable name in the form $$<variable_name>. $$<variable_name>的形式指定双美元符号($$)前缀和变量名。For example, $$year. 例如,$$yearIf the variable is set to a document, you can also include a document field in the form $$<variable_name>.<field>. For example, $$year.month.如果变量设置为文档,您还可以在$$<variable_name>.<field>表单中包含文档字段。例如,$$year.month

For examples, see Use Variables to Customize the Merge.有关示例,请参阅使用变量自定义合并

whenNotMatched

Optional. 可选。The behavior of $merge if a result document does not match an existing document in the out collection.如果结果文档与输出集合中的现有文档不匹配,则$merge的行为。

You can specify one of the pre-defined action strings:您可以指定一个预定义的操作字符串:

Action操作Description描述
"insert" (Default)(默认)

Insert the document into the output collection.将文档插入到输出集合中。

"discard"

Discard the document. Specifically, $merge does not insert the document into the output collection.丢弃文档。具体来说,$merge不会将文档插入到输出集合中。

"fail"

Stop and fail the aggregation operation. Any changes already written to the output collection are not reverted.停止聚合操作并使其失败。已写入输出集合的任何更改都不会恢复。

Considerations注意事项

_id Field Generation字段生成

If the _id field is not present in a document from the aggregation pipeline results, the $merge stage generates it automatically.如果聚合管道结果中的文档中不存在_id字段,则$merge阶段会自动生成它。

For example, in the following aggregation pipeline, $project excludes the _id field from the documents passed into $merge. 例如,在以下聚合管道中,$project从传递到$merge的文档中排除_id字段。When $merge writes these documents to the "newCollection", $merge generates a new _id field and value.$merge将这些文档写入"newCollection"时,$merge会生成一个新的_id字段和值。

db.sales.aggregate( [
{ $project: { _id: 0 } },
{ $merge : { into : "newCollection" } }
] )

Create a New Collection if Output Collection is Non-Existent如果输出集合不存在,则创建新集合

The $merge operation creates a new collection if the specified output collection does not exist.如果指定的输出集合不存在,则$merge操作将创建一个新集合。

  • The output collection is created when $merge writes the first document into the collection and is immediately visible.输出集合是在$merge将第一个文档写入集合时创建的,并且立即可见。
  • If the aggregation fails, any writes completed by the $merge before the error will not be rolled back.如果聚合失败,则$merge在错误发生之前完成的任何写入都不会回滚。

Note

For a replica set or a standalone, if the output database does not exist, $merge also creates the database.对于副本集或独立副本,如果输出数据库不存在,$merge也会创建数据库。

For a sharded cluster, the specified output database must already exist.对于分片集群,指定的输出数据库必须已经存在。

If the output collection does not exist, $merge requires the on identifier to be the _id field. 如果输出集合不存在,$merge要求on标识符是_id字段。To use a different on field value for a collection that does not exist, you can create the collection first by creating a unique index on the desired field(s) first. 要为不存在的集合使用不同的on字段值,您可以首先在所需的字段上创建唯一索引,从而创建集合。For example, if the output collection newDailySales201905 does not exist and you want to specify the salesDate field as the on identifier:例如,如果输出集合newDailySales201905不存在,并且您想将salesDate字段指定为on标识符:

db.newDailySales201905.createIndex( { salesDate: 1 }, { unique: true } )

db.sales.aggregate( [
{ $match: { date: { $gte: new Date("2019-05-01"), $lt: new Date("2019-06-01") } } },
{ $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalqty: { $sum: "$quantity" } } },
{ $project: { _id: 0, salesDate: { $toDate: "$_id" }, totalqty: 1 } },
{ $merge : { into : "newDailySales201905", on: "salesDate" } }
] )

Output to a Sharded Collection输出到分片集合

The $merge stage can output to a sharded collection. $merge阶段可以输出到分片集合。When the output collection is sharded, $merge uses the _id field and all the shard key fields as the default on identifier. 当输出集合被分片时,$merge使用_id字段和所有分片键字段作为标识符的默认值。If you override the default, the on identifier must include all the shard key fields:如果覆盖默认值,on标识符必须包含所有分片键字段:

{ $merge: {
into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields分片键段和任何其他字段
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

For example, use the sh.shardCollection() method to create a new sharded collection newrestaurants with the postcode field as the shard key.例如,使用sh.shardCollection()方法创建一个新的分片集合newrestaurants,其中postcode(邮政编码)字段作为分片键。

sh.shardCollection(
"exampledb.newrestaurants", // Namespace of the collection to shard
{ postcode: 1 }, // Shard key
);

The newrestaurants collection will contain documents with information on new restaurant openings by month (date field) and postcode (shard key); specifically, the on identifier is ["date", "postcode"] (the ordering of the fields does not matter). newrestaurants集合将包含按月份(date字段)和邮政编码(分片键)列出的新餐厅开业信息的文档;具体来说,on标识符是["date", "postcode"](字段的顺序无关紧要)。Because $merge requires a unique index with keys that correspond to the on identifier fields, create the unique index (the ordering of the fields do not matter): 因为$merge需要一个具有与标识符字段对应的键的唯一索引,所以创建唯一索引(字段的顺序无关紧要):[1]

use exampledb
db.newrestaurants.createIndex( { postcode: 1, date: 1 }, { unique: true } )

With the sharded collection restaurants and the unique index created, you can use $merge to output the aggregation results to this collection, matching on [ "date", "postcode" ] as in this example:创建了分片集合restaurants和唯一索引后,您可以使用$merge将聚合结果输出到此集合,在[ "date", "postcode" ]进行匹配,如下例所示:

use exampledb

db.openings.aggregate([
{ $group: {
_id: { date: { $dateToString: { format: "%Y-%m", date: "$date" } }, postcode: "$postcode" },
restaurants: { $push: "$restaurantName" } } },
{ $project: { _id: 0, postcode: "$_id.postcode", date: "$_id.date", restaurants: 1 } },
{ $merge: { into: "newrestaurants", "on": [ "date", "postcode" ], whenMatched: "replace", whenNotMatched: "insert" } }
])
[1] The sh.shardCollection() method can also create a unique index on the shard key when passed the { unique: true } option if: the shard key is range-based, the collection is empty, and a unique index on the shard key doesn't already exist.当传递{ unique: true }选项时,sh.shardCollection()方法还可以在分片键上创建一个唯一索引,如果:分片键是基于范围的,集合是空的,并且分片键的唯一索引不存在。In the previous example, because the on identifier is the shard key and another field, a separate operation to create the corresponding index is required.在前面的示例中,由于on标识符是分片键和另一个字段,因此需要单独的操作来创建相应的索引。

Replace Documents ($merge) vs Replace Collection ($out)替换文档($merge)与替换集合($out

$merge can replace an existing document in the output collection if the aggregation results contain a document or documents that match based on the on specification. 如果聚合结果包含基于on规范匹配的一个或多个文档,则可以替换输出集合中的现有文档。As such, $merge can replace all documents in the existing collection if the aggregation results include matching documents for all existing documents in the collection and you specify "replace" for whenMatched.因此,如果聚合结果包括集合中所有现有文档的匹配文档,并且您为匹配时指定了"replace",则$merge可以替换现有集合中的所有文档。

However, to replace an existing collection regardless of the aggregation results, use $out instead.但是,要替换现有集合而不管聚合结果如何,请使用$out

Existing Documents and _id and Shard Key Values现有文档、_id和分片键值

The $merge errors if the $merge results in a change to an existing document's _id value.如果$merge导致现有文档的_id值发生更改,则$merge错误。

Tip

To avoid this error, if the on field does not include the _id field, remove the _id field in the aggregation results to avoid the error, such as with a preceding $unset stage, and so on.为了避免此错误,如果on字段不包括_id字段,请在聚合结果中删除_id字段以避免错误,例如前面的$unset阶段,等等。

Additionally, for a sharded collection, $merge also generates an error if it results in a change to the shard key value of an exising document.此外,对于分片集合,如果$merge导致现有文档的分片键值发生变化,它也会生成错误。

Any writes completed by the $merge before the error will not be rolled back.$merge在错误发生之前完成的任何写入都不会回滚。

Unique Index Constraints唯一索引约束

If the unique index used by $merge for on field(s) is dropped mid-aggregation, there is no guarantee that the aggregation will be killed. 如果$merge在字段上使用的唯一索引在聚合过程中被丢弃,则无法保证聚合会被终止。If the aggregation continues, there is no guarantee that documents do not have duplicate on field values.如果聚合继续进行,则无法保证文档在没有重复on字段值。

If the $merge attempts to write a document that violates any unique index on the output collection, the operation generates an error. For example:如果$merge试图写入违反输出集合上任何唯一索引的文档,则操作将生成错误。例如:

  • Insert a non-matching document插入不匹配的文档 that violates a unique index other than the index on the on field(s).这违反了除on字段上的索引之外的唯一索引。
  • Fail if there is a matching document in the collection. Specifically, the operation attempts to insert the matching document which violates the unique index on the on field(s).如果集合中有匹配的文档,则失败。具体来说,该操作试图插入违反on字段唯一索引的匹配文档。
  • Replace an existing document with a new document that violates a unique index other than the index on the on field(s).用违反唯一索引的新文档替换现有文档,该唯一索引不是on字段上的索引。
  • Merge the matching documents that results in a document that violates a unique index other than the index on the on field(s).合并匹配的文档,这些文档会导致文档违反唯一索引,而不是on字段上的索引。

Schema Validation模式验证

If your collection uses schema validation and has validationAction set to error, inserting an invalid document or updating a document with invalid values with $merge throws a MongoServerError and the document is not written to the target collection. 如果您的集合使用模式验证并且将validationAction设置为error,则插入无效文档或使用$merge更新具有无效值的文档会抛出MongoServerError,并且文档不会写入目标集合。If there are multiple invalid documents, only the first invalid document encountered throws an error. 如果有多个无效文档,则只有遇到的第一个无效文档会抛出错误。All valid documents are written to the target collection, and all invalid documents fail to write.所有有效文档都写入目标集合,所有无效文档都无法写入。

whenMatched Pipeline Behavior管道行为

If all of the following are true for a $merge stage, $merge inserts the document directly into the output collection:如果对于$merge阶段,以下所有条件都为真,则$merge会将文档直接插入到输出集合中:

  • The value of whenMatched is an aggregation pipeline,whenMatched的值是一个聚合管道,
  • The value of whenNotMatched is insert, andwhenNotMatched的值是insert,以及
  • There is no match for a document in the output collection,输出集合中没有匹配的文档,

$merge inserts the document directly into the output collection.将文档直接插入到输出集合中。

$merge and $out Comparison的比较

With the introduction of $merge, MongoDB provides two stages, $merge and $out, for writing the results of the aggregation pipeline to a collection:随着$merge的引入,MongoDB提供了两个阶段,$merge$out,用于将聚合管道的结果写入集合:

$merge$out
  • Can output to a collection in the same or different database.可以输出到相同或不同数据库中的集合。
  • Can output to a collection in the same or different database.可以输出到相同或不同数据库中的集合。
  • Creates a new collection if the output collection does not already exist.如果输出集合不存在,则创建新集合。
  • Creates a new collection if the output collection does not already exist.如果输出集合不存在,则创建新集合。
  • Can incorporate results (insert new documents, merge documents, replace documents, keep existing documents, fail the operation, process documents with a custom update pipeline) into an existing collection.可以将结果(插入新文档、合并文档、替换文档、保留现有文档、操作失败、使用自定义更新管道处理文档)合并到现有集合中。

    See also Replace Documents ($merge) vs Replace Collection ($out).另请参见替换文档$merge)与替换集合($out)。

  • Replaces the output collection completely if it already exists.如果输出集合已存在,则完全替换它。
  • Can output to a sharded collection. Input collection can also be sharded.可以输出到分片集合。输入集合也可以分片。
  • Cannot output to a sharded collection. Input collection, however, can be sharded.无法输出到分片集合。然而,输入集合可以分片。
  • Corresponds to SQL statements:对应于SQL语句:

    • MERGE.
    • INSERT INTO T2 SELECT FROM T1.
    • SELECT INTO T2 FROM T1.
    • Create/Refresh Materialized Views.创建/刷新物化视图。
  • Corresponds to SQL statement:对应SQL语句:

    • INSERT INTO T2 SELECT FROM T1.
    • SELECT INTO T2 FROM T1.

Output to the Same Collection that is Being Aggregated输出到正在聚合的同一集合

Warning

When $merge outputs to the same collection that is being aggregated, documents may get updated multiple times or the operation may result in an infinite loop. $merge输出到正在聚合的同一集合时,文档可能会多次更新,或者该操作可能会导致无限循环。This behavior occurs when the update performed by $merge changes the physical location of documents stored on disk. $merge执行的更新更改了存储在磁盘上的文档的物理位置时,就会发生这种行为。When the physical location of a document changes, $merge may view it as an entirely new document, resulting in additional updates. 当文档的物理位置发生变化时,$merge可能会将其视为一个全新的文档,从而导致其他更新。For more information on this behavior, see Halloween Problem.有关此行为的更多信息,请参阅万圣节问题

$merge can output to the same collection that is being aggregated. You can also output to a collection which appears in other stages of the pipeline, such as $lookup.可以输出到正在聚合的同一集合。您还可以输出到出现在管道其他阶段的集合,例如$lookup

Restrictions限制

Restrictions限制Description描述
transactionsAn aggregation pipeline cannot use $merge inside a transaction.聚合管道不能在事务中使用$merge
Time Series Collections时间序列集合An aggregation pipeline cannot use $merge to output to a time series collection.聚合管道不能使用$merge输出到时间序列集合。
view definition视图定义
Separate from materialized view与物化视图分离
A view definition cannot include the $merge stage. 视图定义不能包含$merge阶段。If the view definition includes nested pipeline (for example, the view definition includes $facet stage), this $merge stage restriction applies to the nested pipelines as well.如果视图定义包括嵌套管道(例如,视图定义包括$facet阶段),则此$merge阶段限制也适用于嵌套管道。
$lookup stage阶段$lookup stage's nested pipeline cannot include the $merge stage.$lookup阶段的嵌套管道不能包含$merge阶段。
$facet stage阶段$facet stage's nested pipeline cannot include the $merge stage.$facet阶段的嵌套管道不能包含$merge阶段。
$unionWith stage阶段$unionWith stage's nested pipeline cannot include the $merge stage.$unionWith阶段的嵌套管道不能包含$merge阶段。
"linearizable" read concern读取关注The $merge stage cannot be used in conjunction with read concern "linearizable". $merge阶段不能与读取关注"linearizable"结合使用。That is, if you specify "linearizable" read concern for db.collection.aggregate(), you cannot include the $merge stage in the pipeline.也就是说,如果为db.collection.aggregate()指定"linearizable"读取关注,则不能在管道中包含$merge阶段。

Examples示例

MongoDB Shell

On-Demand Materialized View: Initial Creation按需物化视图:初步创建

If the output collection does not exist, the $merge creates the collection.如果输出集合不存在,则$merge将创建该集合。

For example, a collection named salaries in the zoo database is populated with the employee salary and department history:例如,zoo(动物园)数据库中名为salaries(工资)的集合填充了员工工资和部门历史记录:

db.getSiblingDB("zoo").salaries.insertMany([
{ "_id" : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ "_id" : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ "_id" : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ "_id" : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ "_id" : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ "_id" : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ "_id" : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ "_id" : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ "_id" : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ "_id" : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
])

You can use the $group and $merge stages to initially create a collection named budgets (in the reporting database) from the data currently in the salaries collection:您可以使用$group$merge阶段从salaries集合中的当前数据中初始创建一个名为budgets(预算)的集合(在reporting数据库中):

Note

For a replica set or a standalone deployment, if the output database does not exist, $merge also creates the database.对于副本集或独立部署,如果输出数据库不存在,$merge也会创建数据库。

For a sharded cluster deployment, the specified output database must already exist.对于分片集群部署,指定的输出数据库必须已经存在。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
  • $group stage to group the salaries by the fiscal_year and dept.$group阶段,按fiscal_year(财政年度)和dept(部门)对工资进行分组。
  • $merge stage writes the output of the preceding $group stage to the budgets collection in the reporting database.$merge阶段将前一个$group阶段的输出写入报告数据库中的预算集合。

To view the documents in the new budgets collection:要查看新budgets集合中的文档,请执行以下操作:

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

The budgets collection contains the following documents:budgets集合包含以下文件:

{ "_id" : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ "_id" : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ "_id" : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 }
{ "_id" : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }

On-Demand Materialized View: Update/Replace Data按需物化视图:更新/替换数据

The following example uses the collections in the previous example.以下示例使用了前一个示例中的集合。

The example salaries collection contains the employee salary and department history:示例salaries集合包含员工工资和部门历史记录:

db.salaries.insertMany( [
{ _id : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ _id : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ _id : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ _id : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ _id : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ _id : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ _id : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ _id : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ _id : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ _id : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
] )

The example budgets collection contains the cumulative yearly budgets:示例budgets集合包含累计年度预算:

db.budgets.insertMany( [
{ _id : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 },
{ _id : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 },
{ _id : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 },
{ _id : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 },
{ _id : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 125000 },
{ _id : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 310000 }
] )

During the current fiscal year (2019 in this example), new employees are added to the salaries collection and new head counts are pre-allocated for the next year:在当前财政年度(本例中为2019年),新员工被添加到salaries中,新员工人数被预先分配到下一年:

db.getSiblingDB("zoo").salaries.insertMany( [
{ _id : 11, employee: "Wren", dept: "Z", salary: 100000, fiscal_year: 2019 },
{ _id : 12, employee: "Zebra", dept: "A", salary: 150000, fiscal_year: 2019 },
{ _id : 13, employee: "headcount1", dept: "Z", salary: 120000, fiscal_year: 2020 },
{ _id : 14, employee: "headcount2", dept: "Z", salary: 120000, fiscal_year: 2020 }
] )

To update the budgets collection to reflect the new salary information, the following aggregation pipeline uses:为了更新budgets(预算)集合以反映新的薪资信息,以下聚合管道使用:

  • $match stage to find all documents with fiscal_year greater than or equal to 2019.阶段查找所有fiscal_year(财政年度)大于或等于2019年的文档。
  • $group stage to group the salaries by the fiscal_year and dept.阶段按fiscal_yeardept对工资进行分组。
  • $merge to write the result set to the budgets collection, replacing documents with the same _id value (in this example, a document with the fiscal year and dept). 要将结果集写入budgets集合,请用相同的_id替换文档(在本例中,是一个具有会计年度和部门的文档)。For documents that do not have matches in the collection, $merge inserts the new documents.对于集合中没有匹配项的文档,$merge会插入新文档。
db.getSiblingDB("zoo").salaries.aggregate( [
{ $match : { fiscal_year: { $gte : 2019 } } },
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, salaries: { $sum: "$salary" } } },
{ $merge : { into: { db: "reporting", coll: "budgets" }, on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )

After the aggregation is run, view the documents in the budgets collection:运行聚合后,查看budgets集合中的文档:

db.getSiblingDB("reporting").budgets.find().sort( { _id: 1 } )

The budgets collection incorporates the new salary data for fiscal year 2019 and adds new documents for fiscal year 2020:budgets集合纳入了2019财年的新工资数据,并添加了2020财年的新文件:

   { _id : { "fiscal_year" : 2017, "dept" : "A" }, "salaries" : 220000 }
{ _id : { "fiscal_year" : 2017, "dept" : "Z" }, "salaries" : 115000 }
{ _id : { "fiscal_year" : 2018, "dept" : "A" }, "salaries" : 215000 }
{ _id : { "fiscal_year" : 2018, "dept" : "Z" }, "salaries" : 280000 }
{ _id : { "fiscal_year" : 2019, "dept" : "A" }, "salaries" : 275000 }
{ _id : { "fiscal_year" : 2019, "dept" : "Z" }, "salaries" : 410000 }
{ _id : { "fiscal_year" : 2020, "dept" : "Z" }, "salaries" : 240000 }

Only Insert New Data仅插入新数据

To ensure that the $merge does not overwrite existing data in the collection, set whenMatched to keepExisting or fail.为确保$merge不会覆盖集合中的现有数据,请将whenMatched设置为keepExistingfail

The example salaries collection in the zoo database contains the employee salary and department history:zoo数据库中的示例salaries集合包含员工工资和部门历史记录:

db.salaries.insertMany( [
{ _id : 1, employee: "Ant", dept: "A", salary: 100000, fiscal_year: 2017 },
{ _id : 2, employee: "Bee", dept: "A", salary: 120000, fiscal_year: 2017 },
{ _id : 3, employee: "Cat", dept: "Z", salary: 115000, fiscal_year: 2017 },
{ _id : 4, employee: "Ant", dept: "A", salary: 115000, fiscal_year: 2018 },
{ _id : 5, employee: "Bee", dept: "Z", salary: 145000, fiscal_year: 2018 },
{ _id : 6, employee: "Cat", dept: "Z", salary: 135000, fiscal_year: 2018 },
{ _id : 7, employee: "Gecko", dept: "A", salary: 100000, fiscal_year: 2018 },
{ _id : 8, employee: "Ant", dept: "A", salary: 125000, fiscal_year: 2019 },
{ _id : 9, employee: "Bee", dept: "Z", salary: 160000, fiscal_year: 2019 },
{ _id : 10, employee: "Cat", dept: "Z", salary: 150000, fiscal_year: 2019 }
] )

A collection orgArchive in the reporting database contains historical departmental organization records for the past fiscal years. Archived records should not be modified.reporting数据库中的集合orgArchive包含过去财政年度的历史部门组织记录。不应修改已存档的记录。

db.orgArchive.insertMany( [
{ _id : ObjectId("5cd8c68261baa09e9f3622be"), "employees" : [ "Ant", "Gecko" ], "dept" : "A", "fiscal_year" : 2018 },
{ _id : ObjectId("5cd8c68261baa09e9f3622bf"), "employees" : [ "Ant", "Bee" ], "dept" : "A", "fiscal_year" : 2017 },
{ _id : ObjectId("5cd8c68261baa09e9f3622c0"), "employees" : [ "Bee", "Cat" ], "dept" : "Z", "fiscal_year" : 2018 },
{ _id : ObjectId("5cd8c68261baa09e9f3622c1"), "employees" : [ "Cat" ], "dept" : "Z", "fiscal_year" : 2017 }
] )

The orgArchive collection has a unique compound index on the fiscal_year and dept fields. Specifically, there should be at most one record for the same fiscal year and department combination:orgArchive集合在fiscal_yeardept字段上有一个独特的复合索引。具体来说,同一财政年度和部门组合最多只能有一条记录:

db.getSiblingDB("reporting").orgArchive.createIndex ( { fiscal_year: 1, dept: 1 }, { unique: true } )

At the end of current fiscal year (2019 in this example), the salaries collection contain the following documents:在当前财政年度结束时(本例中为2019年),salaries集合包含以下文件:

db.salaries.insertMany( [
{ _id : 1, "employee" : "Ant", "dept" : "A", "salary" : 100000, "fiscal_year" : 2017 },
{ _id : 2, "employee" : "Bee", "dept" : "A", "salary" : 120000, "fiscal_year" : 2017 },
{ _id : 3, "employee" : "Cat", "dept" : "Z", "salary" : 115000, "fiscal_year" : 2017 },
{ _id : 4, "employee" : "Ant", "dept" : "A", "salary" : 115000, "fiscal_year" : 2018 },
{ _id : 5, "employee" : "Bee", "dept" : "Z", "salary" : 145000, "fiscal_year" : 2018 },
{ _id : 6, "employee" : "Cat", "dept" : "Z", "salary" : 135000, "fiscal_year" : 2018 },
{ _id : 7, "employee" : "Gecko", "dept" : "A", "salary" : 100000, "fiscal_year" : 2018 },
{ _id : 8, "employee" : "Ant", "dept" : "A", "salary" : 125000, "fiscal_year" : 2019 },
{ _id : 9, "employee" : "Bee", "dept" : "Z", "salary" : 160000, "fiscal_year" : 2019 },
{ _id : 10, "employee" : "Cat", "dept" : "Z", "salary" : 150000, "fiscal_year" : 2019 },
{ _id : 11, "employee" : "Wren", "dept" : "Z", "salary" : 100000, "fiscal_year" : 2019 },
{ _id : 12, "employee" : "Zebra", "dept" : "A", "salary" : 150000, "fiscal_year" : 2019 },
{ _id : 13, "employee" : "headcount1", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 },
{ _id : 14, "employee" : "headcount2", "dept" : "Z", "salary" : 120000, "fiscal_year" : 2020 }
] )

To update the orgArchive collection to include the fiscal year 2019 that has just ended, the following aggregation pipeline uses:为了更新orgArchive集合以包括刚刚结束的2019财年,使用了以下聚合管道:

  • $match stage to find all documents with fiscal_year equal to 2019.阶段查找所有fiscal_year等于2019年的文件。
  • $group stage to group the employees by the fiscal_year and dept.阶段按fiscal_yeardept对员工进行分组。
  • $project stage to suppress the _id field and add separate dept and fiscal_year field. 阶段以抑制_id字段并添加单独的deptfiscal_year字段。When the documents are passed to $merge, $merge automatically generates a new _id field for the documents.当文档传递给$merge时,$merge会自动为文档生成一个新的_id字段。
  • $merge to write the result set to orgArchive.将结果集写入orgArchive

    The $merge stage matches documents on the dept and fiscal_year fields and fails when matched. 阶段匹配deptfiscal_year字段上的文档,匹配时failsThat is, if a document already exists for the same department and fiscal year, the $merge errors.也就是说,如果同一部门和财政年度的文档已经存在,则$merge会出错。

db.getSiblingDB("zoo").salaries.aggregate( [
{ $match: { fiscal_year: 2019 }},
{ $group: { _id: { fiscal_year: "$fiscal_year", dept: "$dept" }, employees: { $push: "$employee" } } },
{ $project: { _id: 0, dept: "$_id.dept", fiscal_year: "$_id.fiscal_year", employees: 1 } },
{ $merge : { into : { db: "reporting", coll: "orgArchive" }, on: [ "dept", "fiscal_year" ], whenMatched: "fail" } }
] )

After the operation, the orgArchive collection contains the following documents:操作后,orgArchive集合包含以下文档:

{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419f"), "employees" : [ "Ahn", "Bess" ], "dept" : "A", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc419e"), "employees" : [ "Ahn", "Gee" ], "dept" : "A", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438e"), "employees" : [ "Ahn", "Zeb" ], "dept" : "A", "fiscal_year" : 2019 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a0"), "employees" : [ "Carl" ], "dept" : "Z", "fiscal_year" : 2017 }
{ "_id" : ObjectId("5caccc6a66b22dd8a8cc41a1"), "employees" : [ "Bess", "Carl" ], "dept" : "Z", "fiscal_year" : 2018 }
{ "_id" : ObjectId("5caccd0b66b22dd8a8cc438d"), "employees" : [ "Bess", "Carl", "Wen" ], "dept" : "Z", "fiscal_year" : 2019 }

If the orgArchive collection already contained a document for 2019 for department "A" and/or "B", the aggregation fails because of the duplicate key error. However, any document inserted before the error will not be rolled back.如果orgArchive集合已包含部门"A"和/或"B"的2019年文档,则由于重复键错误,聚合失败。但是,在错误之前插入的任何文档都不会回滚。

If you specify keepExisting for the matching document, the aggregation does not affect the matching document and does not error with duplicate key error. 如果为匹配文档指定keepExisting,则聚合不会影响匹配文档,也不会出现重复键错误。Similarly, if you specify replace, the operation would not fail; however, the operation would replace the existing document.同样,如果指定replace,操作也不会失败;然而,该操作将替换现有文档。

Merge Results from Multiple Collections合并来自多个集合的结果

By default, if a document in the aggregation results matches a document in the collection, the $merge stage merges the documents.默认情况下,如果聚合结果中的文档与集合中的文档匹配,则$merge阶段会合并这些文档。

An example collection purchaseorders is populated with the purchase order information by quarter and regions:一个示例集合purchaseorders(采购订单)按季度和地区填充采购订单信息:

db.purchaseorders.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 200, reportDate: new Date("2019-04-01") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 300, reportDate: new Date("2019-04-01") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 700, reportDate: new Date("2019-04-01") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 300, reportDate: new Date("2019-07-01") },
{ _id: 5, quarter: "2019Q2", region: "C", qty: 1000, reportDate: new Date("2019-07-01") },
{ _id: 6, quarter: "2019Q2", region: "A", qty: 400, reportDate: new Date("2019-07-01") },
] )

Another example collection reportedsales is populated with the reported sales information by quarter and regions:另一个示例集合reportedsales按季度和地区填充报告的销售信息:

db.reportedsales.insertMany( [
{ _id: 1, quarter: "2019Q1", region: "A", qty: 400, reportDate: new Date("2019-04-02") },
{ _id: 2, quarter: "2019Q1", region: "B", qty: 550, reportDate: new Date("2019-04-02") },
{ _id: 3, quarter: "2019Q1", region: "C", qty: 1000, reportDate: new Date("2019-04-05") },
{ _id: 4, quarter: "2019Q2", region: "B", qty: 500, reportDate: new Date("2019-07-02") },
] )

Assume that, for reporting purposes, you want to view the data by quarter in the following format:假设出于报告目的,您希望按季度以以下格式查看数据:

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

You can use the $merge to merge in results from the purchaseorders collection and the reportedsales collection to create a new collection quarterlyreport.您可以使用$merge合并purchaseorders集合和reportedsales集合的结果,以创建新的集合quarterlyreport(季度报告)。

To create the quarterlyreport collection, you can use the following pipeline:要创建quarterlyreport集合,可以使用以下管道:

db.purchaseorders.aggregate( [
{ $group: { _id: "$quarter", purchased: { $sum: "$qty" } } }, // group purchase orders by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
First stage:第一阶段:

The $group stage groups by the quarter and uses $sum to add the qty fields into a new purchased field. For example:$group阶段按季度分组,并使用$sum将数量字段添加到新的购买字段中。例如:

To create the quarterlyreport collection, you can use this pipeline:要创建quarterlyreport集合,可以使用以下管道:

{ "_id" : "2019Q2", "purchased" : 1700 }
{ "_id" : "2019Q1", "purchased" : 1200 }
Second stage:第二阶段:
The $merge stage writes the documents to the quarterlyreport collection in the same database. $merge阶段将文档写入同一数据库中的quarterlyreport集合。If the stage finds an existing document in the collection that matches on the _id field, the stage merges the matching documents. 如果阶段在集合中找到与_id字段匹配的现有文档,则阶段会合并匹配的文档。Otherwise, the stage inserts the document. For the initial creation, no documents should match.否则,该阶段将插入文档。对于初始创建,不应匹配任何文档。

To view the documents in the collection, run the following operation:要查看集合中的文档,请运行以下操作:

db.quarterlyreport.find().sort( { _id: 1 } )

The collection contains the following documents:该集合包含以下文件:

{ "_id" : "2019Q1", "sales" : 1200, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 1700, "purchased" : 1700 }

Similarly, run the following aggregation pipeline against the reportedsales collection to merge the sales results into the quarterlyreport collection.同样,对reportedsales集合运行以下聚合管道,将销售结果合并到quarterlyreport集合中。

db.reportedsales.aggregate( [
{ $group: { _id: "$quarter", sales: { $sum: "$qty" } } }, // group sales by quarter
{ $merge : { into: "quarterlyreport", on: "_id", whenMatched: "merge", whenNotMatched: "insert" } }
])
First stage:第一阶段:

The $group stage groups by the quarter and uses $sum to add the qty fields into a new sales field. For example:$group阶段按季度分组,并使用$sum将数量字段添加到新的销售字段中。例如:

{ "_id" : "2019Q2", "sales" : 500 }
{ "_id" : "2019Q1", "sales" : 1950 }
Second stage:第二阶段:
The $merge stage writes the documents to the quarterlyreport collection in the same database. $merge阶段将文档写入同一数据库中的quarterlyreport集合。If the stage finds an existing document in the collection that matches on the _id field (the quarter), the stage merges the matching documents. 如果阶段在集合中找到与_id字段(季度)匹配的现有文档,则阶段会合并匹配的文档。Otherwise, the stage inserts the document.否则,该阶段将插入文档。

To view the documents in the quarterlyreport collection after the data has been merged, run the following operation:要在数据合并后查看quarterlyreport(季度报告)集合中的文档,请运行以下操作:

db.quarterlyreport.find().sort( { _id: 1 } )

The collection contains the following documents:该集合包含以下文件:

{ "_id" : "2019Q1", "sales" : 1950, "purchased" : 1200 }
{ "_id" : "2019Q2", "sales" : 500, "purchased" : 1700 }

Use the Pipeline to Customize the Merge使用管道自定义合并

The $merge can use a custom update pipeline when documents match. The whenMatched pipeline can have the following stages:当文档匹配时,$merge可以使用自定义更新管道whenMatched管道可以有以下阶段:

An example collection votes is populated with the daily vote tally. Create the collection with the following documents:一个示例集合votes(投票)是用每日投票计数填充的。使用以下文档创建集合:

db.votes.insertMany( [
{ date: new Date("2019-05-01"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-02"), "thumbsup" : 3, "thumbsdown" : 1 },
{ date: new Date("2019-05-03"), "thumbsup" : 1, "thumbsdown" : 1 },
{ date: new Date("2019-05-04"), "thumbsup" : 2, "thumbsdown" : 2 },
{ date: new Date("2019-05-05"), "thumbsup" : 6, "thumbsdown" : 10 },
{ date: new Date("2019-05-06"), "thumbsup" : 13, "thumbsdown" : 16 }
] )

Another example collection monthlytotals has the up-to-date monthly vote totals. Create the collection with the following document:另一个例子集合monthlytotals(每月汇总)具有最新月度投票总数。使用以下文档创建集合:

db.monthlytotals.insertOne(
{ "_id" : "2019-05", "thumbsup" : 26, "thumbsdown" : 31 }
)

At the end of each day, that day's votes is inserted into the votes collection:每天结束时,当天的选票会被插入到votes集合中:

db.votes.insertOne(
{ date: new Date("2019-05-07"), "thumbsup" : 14, "thumbsdown" : 10 }
)

You can use $merge with an custom pipeline to update the existing document in the collection monthlytotals:您可以使用$merge和自定义管道更新集合monthlytotals中的现有文档:

db.votes.aggregate([
{ $match: { date: { $gte: new Date("2019-05-07"), $lt: new Date("2019-05-08") } } },
{ $project: { _id: { $dateToString: { format: "%Y-%m", date: "$date" } }, thumbsup: 1, thumbsdown: 1 } },
{ $merge: {
into: "monthlytotals",
on: "_id",
whenMatched: [
{ $addFields: {
thumbsup: { $add:[ "$thumbsup", "$$new.thumbsup" ] },
thumbsdown: { $add: [ "$thumbsdown", "$$new.thumbsdown" ] }
} } ],
whenNotMatched: "insert"
} }
])
First stage:第一阶段:

The $match stage finds the specific day's votes. For example:$match阶段查找特定日期的投票。例如:

{ "_id" : ObjectId("5ce6097c436eb7e1203064a6"), "date" : ISODate("2019-05-07T00:00:00Z"), "thumbsup" : 14, "thumbsdown" : 10 }
Second stage:第二阶段:

The $project stage sets the _id field to a year-month string. For example:$project阶段将_id字段设置为年月字符串。例如:

{ "thumbsup" : 14, "thumbsdown" : 10, "_id" : "2019-05" }
Third stage:第三阶段:

The $merge stage writes the documents to the monthlytotals collection in the same database. $merge阶段将文档写入同一数据库中的monthlotals集合。If the stage finds an existing document in the collection that matches on the _id field, the stage uses a pipeline to add the thumbsup votes and the thumbsdown votes.如果该阶段在集合中找到与_id字段匹配的现有文档,则该阶段使用管道添加thumbsup投票和thumbsdown投票。

  • This pipeline cannot directly accesses the fields from the results document. 此管道无法直接访问结果文档中的字段。To access the thumbsup field and the thumbsdown field in the results document, the pipeline uses the $$new variable; i.e. $$new.thumbsup and $new.thumbsdown.为了访问结果文档中的thumbsup字段和thumbsdown字段,管道使用$$new变量;即$$new.thumbsup$new.thumbsdown
  • This pipeline can directly accesses the thumbsup field and the thumbsdown field in the existing document in the collection; i.e. $thumbsup and $thumbsdown.该管道可以直接访问集合中现有文档中的thumbsup字段和thumbsdown字段;即$thumbsup$thumbsdown

The resulting document replaces the existing document.生成的文档将替换现有文档。

To view documents in the monthlytotals collection after the merge operation, run the following operation:要在合并操作后查看monthlotals集合中的文档,请运行以下操作:

db.monthlytotals.find()

The collection contains the following document:该集合包含以下文档:

{ "_id" : "2019-05", "thumbsup" : 40, "thumbsdown" : 41 }

Use Variables to Customize the Merge使用变量自定义合并

You can use variables in the $merge stage whenMatched field. Variables must be defined before they can be used.您可以在$merge阶段的whenMatched字段中使用变量。变量必须先定义,然后才能使用。

Define variables in one or both of the following:在以下一项或两项中定义变量:

To use variables in whenMatched:要在whenMatched中使用变量,请执行以下操作:

Specify the double dollar sign ($$) prefix together with the variable name in the form $$<variable_name>. $$<variable_name>的形式指定双美元符号($$)前缀和变量名。For example, $$year. If the variable is set to a document, you can also include a document field in the form $$<variable_name>.<field>. For example, $$year.month.例如,$$year。如果变量设置为文档,您还可以以$$<variable_name>.<field>的形式包含文档字段。例如,$$year.month

The tabs below demonstrate behavior when variables are defined in the merge stage, the aggregate command, or both.下面的选项卡演示了在合并阶段、聚合命令或两者中定义变量时的行为。

Merge Stage合并阶段

Use Variables Defined in the Merge Stage使用在合并阶段定义的变量

You can define variables in the $merge stage let and use the variables in the whenMatched field.您可以在$merge阶段let中定义变量,并在whenMatched字段中使用这些变量。

Example:

db.cakeSales.insertOne( [
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
] )

db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {}
} )

db.cakeSales.find()

The example:示例:

  • creates a collection named cakeSales创建一个名为cakeSales的集合
  • runs an aggregate command that defines a year variable in the $merge let and adds the year to cakeSales using whenMatched运行一个聚合命令,在$merge let中定义year变量,并使用whenMatchedyear添加到cakeSales
  • retrieves the cakeSales document检索cakeSales文档

Output:输出:

{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }
Aggregate Command聚合命令

Use Variables Defined in the Aggregate Command使用聚合命令中定义的变量

New in version 5.0.在版本5.0中新增。

You can define variables in the aggregate command let and use the variables in the $merge stage whenMatched field.您可以在aggregate命令let中定义变量,并在$merge阶段whenMatched字段中使用这些变量。

Example:示例:

db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)

db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
whenMatched: [ {
$addFields: { "salesYear": "$$year" } }
] }
}
],
cursor: {},
let : { year: "2020" }
} )

db.cakeSales.find()

The example:示例:

  • creates a collection named cakeSales创建一个名为cakeSales的集合cakeSales
  • runs an aggregate command that defines a year variable in the aggregate command let and adds the year to cakeSales using whenMatched运行一个aggregate命令,在aggregate命令let中定义year变量,并使用whenMatched将年份添加到cakeSales
  • retrieves the cakeSales document检索cakeSales文档

Output:输出:

{ "_id" : 1, "flavor" : "chocolate", "salesTotal" : 1580,
"salesTrend" : "up", "salesYear" : "2020" }
Merge and Aggregate合并和聚合

Use Variables Defined in the Merge Stage and Aggregate Command使用在合并阶段和聚合命令中定义的变量

You can define variables in the $merge stage and, starting in MongoDB 5.0, the aggregate command.您可以在$merge阶段定义变量,并从MongoDB 5.0开始定义aggregate命令。

If two variables with the same name are defined in the $merge stage and the aggregate command, the $merge stage variable is used.如果在$merge阶段和aggregate命令中定义了两个同名变量,则使用$merge阶段变量。

In this example, the year: "2020" $merge stage variable is used instead of the year: "2019" aggregate command variable:在这个例子中,使用year: "2020" $merge阶段变量而不是year: "2019" aggregate命令变量:

db.cakeSales.insertOne(
{ _id: 1, flavor: "chocolate", salesTotal: 1580,
salesTrend: "up" }
)

db.runCommand( {
aggregate: db.cakeSales.getName(),
pipeline: [ {
$merge: {
into: db.cakeSales.getName(),
let : { year: "2020" },
whenMatched: [ {
$addFields: { "salesYear": "$$year" }
} ]
}
} ],
cursor: {},
let : { year: "2019" }
} )

db.cakeSales.find()

Output:输出:

{
_id: 1,
flavor: 'chocolate',
salesTotal: 1580,
salesTrend: 'up',
salesYear: '2020'
}
C#

The C# examples on this page use the sample_mflix database from the Atlas sample datasets. 本页上的C#示例使用Atlas示例数据集中的sample_mflix数据库。To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see Get Started in the MongoDB .NET/C# Driver documentation.要了解如何创建免费的MongoDB Atlas集群并加载示例数据集,请参阅MongoDB .NET/C#驱动程序文档中的入门

The following Movie class models the documents in the sample_mflix.movies collection:以下Movie类对sample_mflix.movies集合中的文档进行建模:

public class Movie
{
public ObjectId Id { get; set; }

public int Runtime { get; set; }

public string Title { get; set; }

public string Rated { get; set; }

public List<string> Genres { get; set; }

public string Plot { get; set; }

public ImdbData Imdb { get; set; }

public int Year { get; set; }

public int Index { get; set; }

public string[] Comments { get; set; }

[BsonElement("lastupdated")]
public DateTime LastUpdated { get; set; }
}

Note

ConventionPack for Pascal CasePascal大小写的约定包

The C# classes on this page use Pascal case for their property names, but the field names in the MongoDB collection use camel case. To account for this difference, you can use the following code to register a ConventionPack when your application starts:此页面上的C#类使用Pascal大小写作为其属性名,但MongoDB集合中的字段名使用驼峰大小写。为了解释这种差异,您可以在应用程序启动时使用以下代码注册ConventionPack

var camelCaseConvention = new ConventionPack { new CamelCaseElementNameConvention() };
ConventionRegistry.Register("CamelCase", camelCaseConvention, type => true);

To use the MongoDB .NET/C# driver to add a $merge stage to an aggregation pipeline, call the Merge() method on a PipelineDefinition object.要使用MongoDB .NET/C#驱动程序向聚合管道添加$merge阶段,请在PipelineDefinition对象上调用Merge()方法。

When you call the Merge() method, you must pass an instance of the MergeStageOptions class. This object lets you specify options for the $merge stage, such as how to handle matching documents.调用Merge()方法时,必须传递MergeStageOptions类的实例。此对象允许您为$merge阶段指定选项,例如如何处理匹配的文档。

The following example creates a pipeline stage that merges the documents in the pipeline into the movies collection. The MergeStageOptions object specifies the following options:以下示例创建了一个管道阶段,将管道中的文档合并到movies集合中。MergeStageOptions对象指定以下选项:

  • The OnFieldNames option specifies that the operation should use the "id" and "title" fields to find matching documents in the source collection and the movies collection.OnFieldNames选项指定操作应使用"id""title"字段在源集合和电影集合中查找匹配的文档。
  • The WhenMatched option specifies that if a document in the source collection matches a document in the movies collection, it should replace the document in the movies collection.WhenMatched选项指定,如果源集合中的文档与movies集合中的某个文档匹配,则应替换movies集合中的文档。
  • The WhenNotMatched option specifies that if a document in the source collection does not match a document in the movies collection, it should be inserted into the movies collection.WhenNotMatched选项指定,如果源集合中的文档与movies集合中的文件不匹配,则应将其插入到movies集合中。
var movieCollection = client
.GetDatabase("sample_mflix")
.GetCollection<Movie>("movies");

var pipeline = new EmptyPipelineDefinition<Movie>()
.Merge(movieCollection,
new MergeStageOptions<Movie>(
{
OnFieldNames = new List<string>() {"id", "title"},
WhenMatched = MergeStageWhenMatched.Replace,
WhenNotMatched = MergeStageWhenNotMatched.Insert,
});
Node.js

The Node.js examples on this page use the sample_mflix database from the Atlas sample datasets. 本页上的Node.js示例使用Atlas示例数据集中的sample_mflix数据库。To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see Get Started in the MongoDB Node.js driver documentation.要了解如何创建免费的MongoDB Atlas集群并加载示例数据集,请参阅MongoDB Node.js驱动程序文档中的入门

To use the MongoDB Node.js driver to add a $merge stage to an aggregation pipeline, use the $merge operator in a pipeline object.要使用MongoDB Node.js驱动程序向聚合管道添加$merge阶段,请在管道对象中使用$merge运算符。

The following example creates a pipeline stage that merges the documents in the pipeline into the movies collection. The example includes the following fields:以下示例创建了一个管道阶段,将管道中的文档合并到movies集合中。该示例包括以下字段:

  • The on option specifies that the operation should use the "_id" and "title" fields to find matching documents in the source collection and the movies collection.on选项指定操作应使用"_id""title"字段在源集合和电影集合中查找匹配的文档。
  • The whenMatched option specifies that if a document in the source collection matches a document in the movies collection, it replaces the document in the movies collection.whenMatched选项指定,如果源集合中的文档与movies集合中的某个文档匹配,它将替换movies集合中该文档。
  • The whenNotMatched option specifies that if a document in the source collection does not match a document in the movies collection, the operation inserts the document into the movies collection.whenNotMatched选项指定,如果源集合中的文档与movies集合中的文件不匹配,则操作会将该文档插入到movies集合中。

The example then runs the aggregation pipeline:然后,该示例运行聚合管道:

const pipeline = [
{
$merge: {
into: "movies",
on: ["_id", "title"],
whenMatched: "replace",
whenNotMatched: "insert"
}
}
];

const cursor = collection.aggregate(pipeline);
return cursor;