Database Manual / Reference / Query Language / Aggregation Stages

$changeStream (aggregation stage)(聚合阶段)

Definition定义

$changeStream

Returns a Change Stream cursor on a collection, a database, or an entire cluster. Must be used as the first stage in an aggregation pipeline.在集合、数据库或整个集群上返回更改流游标。必须用作聚合管道的第一阶段。

The $changeStream stage has the following syntax:$changeStream阶段具有以下语法:

{
$changeStream: {
allChangesForCluster: <boolean>,
fullDocument: <string>,
fullDocumentBeforeChange: <string>,
resumeAfter: <document>
showExpandedEvents: <boolean>,
startAfter: <document>
startAtOperationTime: <timestamp>
}
}
Parameter参数Description描述
allChangesForClusterOptional: Sets whether the change stream should include all changes in the cluster. May only be opened on the admin database.可选:设置更改流是否应包括集群中的所有更改。只能在admin数据库上打开。
fullDocumentOptional: Specifies whether change notifications include a copy of the full document when modified by update operations.可选:指定更改通知在通过update操作修改时是否包括完整文档的副本。
  • default: Change notifications do not include the full document for update operations.:更改通知不包括update操作的完整文档。
  • required: Change notifications includes a copy of the modified document as it appeared immediately after the change. If the document cannot be found, the change stream throws an error.:更改通知包括更改后立即出现的修改文档的副本。如果找不到文档,则更改流会抛出错误。
    To use this option, you must first use the collMod command to enable the changeStreamPreAndPostImages option.要使用此选项,您必须首先使用collMod命令启用changeStreamPreAndPostImages选项。
    New in version 6.0.在版本6.0中新增。版本6.0中的新功能。
  • updateLookup: Change notifications includes a copy of the document modified by the change. This document is the current majority-committed document or null if it no longer exists.:变更通知包括由变更修改的文档副本。此文档是当前大多数人提交的文档,如果已不存在,则为null
  • whenAvailable: Change notification includes a copy of the modified document as it appeared immediately after the change or null if the document is unavailable.:更改通知包括更改后立即出现的修改文档的副本,如果文档不可用,则为null
    To use this option, you must first use the collMod command to enable the changeStreamPreAndPostImages option.要使用此选项,您必须首先使用collMod命令启用changeStreamPreAndPostImages选项。
    New in version 6.0.在版本6.0中新增。版本6.0中的新功能。
In the case of partial updates, the change notification also provides a description of the change.在部分更新的情况下,更改通知还提供了更改的描述。
fullDocumentBeforeChangeInclude the full document from before the change. This field accepts the following values:包括更改前的完整文档。此字段接受以下值:
  • off: Disables inclusion of the document from before the change.:禁止包含更改前的文档。
  • whenAvailable: Includes document from before the change. The query does not fail if the unmodified document is not available.:包括更改前的文档。如果未修改的文档不可用,则查询不会失败。
  • required: Includes document from before the change. The query fails if the unmodified document is not available.:包括更改前的文档。如果未修改的文档不可用,则查询失败。
resumeAfterOptional. Specifies a resume token as the logical starting point for the change stream. 可选。指定恢复令牌作为更改流的逻辑起点。Cannot be used to resume the change stream after an invalidate event.在发生invalidate事件后,无法用于恢复更改流。
resumeAfter is mutually exclusive with startAfter and startAtOperationTime.resumeAfterstartAfterstartAtOperationTime互斥。
showExpandedEventsSpecifies whether to include additional change events, such as such as DDL and index operations.指定是否包含其他更改事件,例如DDL和索引操作。
New in version 6.0.在版本6.0中新增。版本6.0中的新功能。
startAfterOptional. Specifies a resume token as the logical starting point for the change stream. 可选。指定恢复令牌作为更改流的逻辑起点。Unlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream.resumeAfter不同,startAfter可以通过创建新的更改流在invalidate事件后恢复通知。

startAfter is mutually exclusive with resumeAfter and startAtOperationTime.startAfterresumeAfterstartAtOperationTime互斥。

startAtOperationTimeSpecifies a time as the logical starting point for the change stream. Cannot be used with resumeAfter or startAfter fields.指定时间作为更改流的逻辑起点。不能与resumeAfterstartAfter字段一起使用。

Stable API Support稳定的API支持

Change streams are included in Stable API V1. However, the showExpandedEvents option is not included in Stable API V1.变更流包含在稳定的API V1中。但是,showExpandedEvents选项未包含在Stable API V1中。

Examples示例

MongoDB Shell

To create a change stream cursor using the aggregation stage, run the aggregate command.要使用聚合阶段创建更改流游标,请运行aggregate命令。

var cur = db.names.aggregate( [
{ $changeStream: {} }
] )

To open the cursor, run cur.要打开游标,请运行cur

When the change stream detects a change, the next() method returns a change event notification. For example, after running cur.next(), MongoDB returns a document similar to the following:当更改流检测到更改时,next()方法返回更改事件通知。例如,在运行cur.next()后,MongoDB返回一个类似于以下内容的文档:

{
"_id": {
_data: "8262E2EE54000000022B022C0100296E5A100448E5E3DD01364019AE8FE8C6859527E046645F6964006462E2EE54C8756C0D5CF6F0720004"
},
"operationType": "insert",
"clusterTime": Timestamp({ t: 1659039316, i: 2 }),
"wallTime": ISODate("2022-07-28T20:15:16.148Z"),
"fullDocument": {
"_id": ObjectId("62e2ee54c8756c0d5cf6f072"),
"name": "Walker Percy"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": { _id: ObjectId("62e2ee54c8756c0d5cf6f072") }
}
C#

To use the MongoDB .NET/C# driver to add a $changeStream stage to an aggregation pipeline, call the ChangeStream() method on a PipelineDefinition object.要使用MongoDB .NET/C#驱动程序将$changeStream阶段添加到聚合管道中,请在PipelineDefinition对象上调用ChangeStream()方法。

The following example creates a pipeline stage that returns a change stream cursor:以下示例创建了一个返回更改流游标的管道阶段:

var pipeline = new EmptyPipelineDefinition<Movie>()
.ChangeStream();

You can use a ChangeStreamStageOptions object to customize the behavior of the change stream. 您可以使用ChangeStreamStageOptions对象自定义更改流的行为。The following example performs the same $changeStream operation as the previous example, but specifies the following options:以下示例执行与前一个示例相同的$changeStream操作,但指定了以下选项:

  • The FullDocument option specifies that change notifications don't include a copy of the full document when modified by update operations.FullDocument选项指定,当通过更新操作修改时,更改通知不包括完整文档的副本。
  • The StartAtOperationTime option specifies the logical starting point for the change stream.StartAtOperationTime选项指定更改流的逻辑起点。
var changeStreamOptions = new ChangeStreamStageOptions()
{
FullDocument = ChangeStreamFullDocumentOption.Default,
StartAtOperationTime = new BsonTimestamp(300),
};

var pipeline = new EmptyPipelineDefinition<Movie>()
.ChangeStream(changeStreamOptions);

Note

Prefer the Watch() Method较推荐Watch()方法

Where possible, use the IMongoCollection<TDocument>.Watch() method instead of this aggregation stage. Use the ChangeStream() method only if subsequent stages project away the resume token (_id) or if you don't want the resulting cursor to automatically resume.如果可能,请使用IMongoCollection<TDocument>.Watch()方法,而不是此聚合阶段。仅当后续阶段投影掉恢复令牌(_id)或您不希望生成的游标自动恢复时,才使用ChangeStream()方法。

Node.js

You can use both the watch() method and the aggregate() method to execute a $changeStream operation. $changeStream returns a ChangeStreamCursor when you pass the aggregation pipeline to the watch() method on a MongoDB Collection object. 您可以使用watch()方法和aggregate()方法来执行$changeStream操作。当您将聚合管道传递给MongoDB Collection对象上的watch()方法时,$changeStream返回一个ChangeStreamCursor$changeStream returns an AggregationCursor when you pass the aggregation pipeline to the aggregate() method.当您将聚合管道传递给aggregate()方法时,$changeStream返回AggregationCursor

Important

$changeStream Resumability$changeStream可恢复性

If you pass a change stream to the aggregate() method, the change stream can not resume. A change stream only resumes if you pass it to the watch() method. 如果将更改流传递给aggregate()方法,则更改流无法恢复。只有当你将更改流传递给watch()方法时,它才会恢复。To learn more about resumability, see Resume a Change Stream.要了解有关可恢复性的更多信息,请参阅恢复更改流

The following example creates and executes a pipeline that returns a ChangeStreamCursor:以下示例创建并执行一个返回ChangeStreamCursor的管道:

const pipeline = [{ $changeStream: {} }];

const changeStream = collection.watch(pipeline);
return changeStream;

You can use a ChangeStreamOptions object to customize the behavior of the change stream. The following example performs the same $changeStream operation as the previous example, but specifies the following options:您可以使用ChangeStreamOptions对象自定义更改流的行为。以下示例执行与前一个示例相同的$changeStream操作,但指定了以下选项:

  • The fullDocument option specifies that if an update operation modifies a document, the change notifications include a copy of the full document.fullDocument选项指定,如果更新操作修改了文档,则更改通知将包括完整文档的副本。
  • The startAtOperationTime option specifies the logical starting point for the change stream.startAtOperationTime选项指定更改流的逻辑起点。
const pipeline = [
{
$changeStream: {
fullDocument: 'updateLookup',
startAtOperationTime: 3000
}
}
];

const changeStream = collection.watch(pipeline);
return changeStream;

Learn More了解更多

For more information on change stream notifications, see Change Events.有关更改流通知的更多信息,请参阅更改事件

To learn more about related pipeline stages, see the $changeStreamSplitLargeEvent guide.要了解有关相关管道阶段的更多信息,请参阅$changeStreamSplitLargeEvent指南。