Database Manual / Reference / mongosh Methods / Collections

db.collection.watch() (mongosh method方法)

Definition定义

db.collection.watch( pipeline, options )

Important

mongosh Method方法

This page documents a mongosh method. This is not the documentation for database commands or language-specific drivers, such as Node.js.本页记录了一种mongosh方法。这不是数据库命令或特定语言驱动程序(如Node.js)的文档。

For the database command, see the aggregate command with the $changeStream aggregation stage.对于数据库命令,请参阅带有$changeStream聚合阶段的aggregate命令。

For MongoDB API drivers, refer to the language-specific MongoDB driver documentation.有关MongoDB API驱动程序,请参阅特定语言的MongoDB驱动程序文档

For replica sets and sharded clusters only仅适用于副本集和分片集群

Opens a change stream cursor on the collection.在集合上打开更改流游标

Parameter参数Type类型Description描述
pipelinearray数组

Optional. 可选。An Aggregation Pipeline consisting of one or more of the following aggregation stages:由以下一个或多个聚合阶段组成的聚合管道

Specify a pipeline to filter/modify the change events output.指定一个管道来筛选/修改更改事件输出。

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果更改流聚合管道修改了事件的_id字段,则更改流将抛出异常。

optionsdocument文档Optional. 可选。Additional options that modify the behavior of watch().修改watch()行为的其他选项。

The options document can contain the following fields and values:options文档可以包含以下字段和值:

Field字段Type类型Description描述
resumeAfterdocument文档

Optional. 可选。Directs watch() to attempt resuming notifications starting after the operation specified in the resume token.指示watch()尝试在恢复令牌中指定的操作之后恢复通知。

Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.每个更改流事件文档都包含一个恢复令牌作为_id字段。传递更改事件文档的整个_id字段,该字段表示您要在之后恢复的操作。

resumeAfter is mutually exclusive with startAfter and startAtOperationTime.resumeAfterstartAfterstartAtOperationTime互斥。

startAfterdocument文档

Optional. 可选。Directs watch() to attempt starting a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.指示watch()在恢复令牌中指定的操作之后尝试启动新的更改流。允许在无效事件后恢复通知。

Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.每个更改流事件文档都包含一个恢复令牌作为_id字段。传递更改事件文档的整个_id字段,该字段表示您要在之后恢复的操作。

startAfter is mutually exclusive with resumeAfter and startAtOperationTime.startAfterresumeAfterstartAtOperationTime互斥。

fullDocumentstring字符串

Optional. 可选。By default, watch() returns the delta of those fields modified by an update operation, instead of the entire updated document.默认情况下,watch()返回由更新操作修改的字段的增量,而不是整个更新的文档。

Set fullDocument to "updateLookup" to direct watch() to look up the most current majority-committed version of the updated document. fullDocument设置为"updateLookup"以直接watch()来查找更新文档的最新多数提交版本。watch() returns a fullDocument field with the document lookup in addition to the updateDescription delta.watch()返回一个fullDocument字段,其中除了updateDescription增量外,还包含文档查找。

Starting in MongoDB 6.0, you can set fullDocument to:从MongoDB 6.0开始,您可以将fullDocument设置为:

  • "whenAvailable" to output the document post-image, if available, after the document was inserted, replaced, or updated.在插入、替换或更新文档后输出文档后映像(如果可用)。
  • "required" to output the document post-image after the document was inserted, replaced, or updated. Raises an error if the post-image is not available.在插入、替换或更新文档后输出文档后映射。如果后映射不可用,则引发错误。
fullDocumentBeforeChangestring字符串

Optional.可选。

Starting in MongoDB 6.0, you can use the new fullDocumentBeforeChange field and set it to:从MongoDB 6.0开始,您可以使用新的fullDocumentBeforeChange字段并将其设置为:

  • "whenAvailable" to output the document pre-image, if available, before the document was replaced, updated, or deleted.在替换、更新或删除文档之前输出文档前映像(如果可用)。
  • "required" to output the document pre-image before the document was replaced, updated, or deleted. Raises an error if the pre-image is not available.在替换、更新或删除文档之前输出文档前映像。如果前映像不可用,则引发错误。
  • "off" to suppress the document pre-image. 以抑制文档前映像。"off" is the default.是默认值。
batchSizeint

Optional. 可选。The maximum number of documents that can be returned in each batch of a change stream. 变更流的每批中可以返回的最大文档数。By default, watch() has an initial batch size of the lesser of 101 documents or 16 mebibytes (MiB) worth of documents. 默认情况下,watch()的初始批处理大小为101个文档或价值16兆字节(MiB)的文档中的较小者。Subsequent batches have a maximum size of 16 mebibytes. This option can enforce a smaller limit than 16 MiB, but not a larger one. 后续批次的最大大小为16兆字节。此选项可以强制执行小于16兆字节的限制,但不能强制执行更大的限制。When set, the batchSize is the lesser of batchSize documents or 16 MiB worth of documents.设置后,batchSizebatchSize文档或价值16 MiB的文档中的较小值。

Has the same functionality as cursor.batchSize().具有与cursor.batchSize()相同的功能。

maxAwaitTimeMSint

Optional. 可选。The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.服务器在返回空批之前等待新数据更改报告给更改流游标的最长时间(毫秒)。

Defaults to 1000 milliseconds.默认值为1000毫秒。

collationdocument文档

Optional. 可选。Pass a collation document to specify a collation for the change stream cursor.传递一个排序规则文档,为更改流游标指定排序规则。

Defaults to simple binary comparison if omitted.如果省略,则默认为simple二进制比较。

showExpandedEventsboolean布尔值

Optional. 可选。Starting in MongoDB 6.0, change streams support change notifications for DDL events, like the createIndexes and dropIndexes events. 从MongoDB 6.0开始,更改流支持DDL事件的更改通知,如createIndexesdropIndexes事件。To include expanded events in a change stream, create the change stream cursor using the showExpandedEvents option.要在更改流中包含扩展事件,请使用showExpandedEvents选项创建更改流游标。

New in version 6.0.在版本6.0中新增。

startAtOperationTimeTimestamp

Optional. 可选。The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. 变更流的起点。如果指定的起点在过去,则它必须在oplog的时间范围内。To check the time range of the oplog, see rs.printReplicationInfo().要检查oplog的时间范围,请参阅rs.printReplicationInfo()

startAtOperationTime is mutually exclusive with resumeAfter and startAfter.startAtOperationTimeresumeAfterstartAfter互斥。

Returns:返回A cursor that remains open as long as a connection to the MongoDB deployment remains open and the collection exists. 只要与MongoDB部署的连接保持打开并且集合存在,游标就会保持打开状态。See Change Events for examples of change event documents.有关变更事件文档的示例,请参阅变更事件

Compatibility兼容性

This method is available in deployments hosted in the following environments:此方法在以下环境中托管的部署中可用:

  • MongoDB Atlas: The fully managed service for MongoDB deployments in the cloud:云中MongoDB部署的完全托管服务

Note

This command is supported in all MongoDB Atlas clusters. 所有MongoDB Atlas集群都支持此命令。For information on Atlas support for all commands, see Unsupported Commands.有关Atlas支持所有命令的信息,请参阅不支持的命令

  • MongoDB Enterprise: The subscription-based, self-managed version of MongoDB:MongoDB的基于订阅的自我管理版本
  • MongoDB Community: The source-available, free-to-use, and self-managed version of MongoDB:MongoDB的源代码可用、免费使用和自我管理版本

Availability可用性

Deployment部署

db.collection.watch() is available for replica set and sharded cluster deployments :可用于副本集和分片群集部署:

Storage Engine存储引擎

You can only use db.collection.watch() with the Wired Tiger storage engine.您只能将db.collection.watch()Wired Tiger存储引擎一起使用。

Read Concern majority Support读取关注majority支持

Change streams are available regardless of the "majority" read concern support; that is, read concern majority support can be either enabled (default) or disabled to use change streams.无论是否支持"majority"读取关注,都可以使用更改流;也就是说,可以启用(默认)或禁用读取关注majority支持以使用更改流。

Behavior行为

  • db.collection.watch() only notifies on data changes that have persisted to a majority of data-bearing members.仅通知大多数数据承载成员持续的数据更改。
  • The change stream cursor remains open until one of the following occurs:更改流游标将保持打开状态,直到出现以下情况之一:

    • The cursor is explicitly closed.游标已明确关闭。
    • An invalidate event occurs; for example, a collection drop or rename.发生无效事件;例如,删除或重命名集合。
    • The connection to the MongoDB deployment closes or times out. See Behavior for more information.与MongoDB部署的连接关闭或超时。有关更多信息,请参阅行为
    • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close. The closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片删除可能会导致打开的更改流游标关闭。关闭的更改流游标可能无法完全恢复。

Resumability可恢复性

Unlike the MongoDB Drivers, mongosh does not automatically attempt to resume a change stream cursor after an error. 与MongoDB驱动程序不同,mongosh在发生错误后不会自动尝试恢复更改流游标。The MongoDB drivers make one attempt to automatically resume a change stream cursor after certain errors.MongoDB驱动程序在出现某些错误后尝试自动恢复更改流游标。

db.collection.watch() uses information stored in the oplog to produce the change event description and generate a resume token associated to that operation. 使用oplog中存储的信息来生成更改事件描述,并生成与该操作相关联的恢复令牌。If the operation identified by the resume token passed to the resumeAfter or startAfter option has already dropped off the oplog, db.collection.watch() cannot resume the change stream.如果传递给resumeAfterstartAfter选项的恢复令牌标识的操作已经从oplog中删除,db.collection.watch()将无法恢复更改流。

See Resume a Change Stream for more information on resuming a change stream.有关恢复更改流的更多信息,请参阅恢复更改流

Note

  • You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. 无效事件(例如,集合删除或重命名)关闭更改流后,您不能使用resumeAfter来恢复更改流。Instead, you can use startAfter to start a new change stream after an invalidate event.相反,您可以使用startAfter无效事件后启动新的更改流。
  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close. The closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片删除可能会导致打开的更改流游标关闭。关闭的更改流游标可能无法完全恢复。

Note

You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. 无效事件(例如,集合删除或重命名)关闭更改流后,您不能使用resumeAfter来恢复更改流。Instead, you can use startAfter to start a new change stream after an invalidate event.相反,您可以使用startAfter无效事件后启动新的更改流。

Full Document Lookup of Update Operations更新操作的完整文档查找

By default, the change stream cursor returns specific field changes/deltas for update operations. 默认情况下,更改流游标返回用于更新操作的特定字段更改/增量。You can also configure the change stream to look up and return the current majority-committed version of the changed document. 您还可以配置更改流,以查找并返回已更改文档的当前多数提交版本。Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档有很大不同。

Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.根据更新操作期间应用的更改数量和完整文档的大小,存在更新操作的更改事件文档的大小大于16MB BSON文档限制的风险。如果发生这种情况,服务器将关闭更改流游标并返回错误。

Access Control访问控制

When running with access control, the user must have the find and changeStream privilege actions on the collection resource. 当使用访问控制运行时,用户必须对集合资源具有findchangeStream权限操作。That is, a user must have a role that grants the following privilege:也就是说,用户必须具有授予以下权限角色

{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }

The built-in read role provides the appropriate privileges.内置的read角色提供适当的权限。

Cursor Iteration游标迭代

MongoDB provides multiple ways to iterate on a cursor.MongoDB提供了多种方法来迭代游标。

The cursor.hasNext() method blocks and waits for the next event. To monitor the watchCursor cursor and iterate over the events, use hasNext() like this:cursor.hasNext()方法会阻塞并等待下一个事件。要监视watchCursor游标并迭代事件,请使用hasNext(),如下所示:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

The cursor.tryNext() method is non-blocking. To monitor the watchCursor cursor and iterate over the events, use tryNext() like this:cursor.tryNext()方法是非阻塞的。要监视watchCursor游标并迭代事件,请使用tryNext(),如下所示:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

Examples示例

Open a Change Stream打开更改流

The following operation opens a change stream cursor against the data.sensors collection:以下操作将在data.sensors集合上打开更改流游标:

watchCursor = db.getSiblingDB("data").sensors.watch()

Iterate the cursor to check for new events. Use the cursor.isClosed() method with the cursor.tryNext() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:迭代游标以检查新事件。将cursor.isClosed()方法与cursor.tryNext()方法一起使用,以确保只有在更改流游标关闭并且最新批中没有剩余对象时,循环才会退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

For complete documentation on change stream output, see Change Events.有关更改流输出的完整文档,请参阅更改事件

Note

You cannot use isExhausted() with change streams.您不能将isExhausted()用于更改流

Change Stream with Full Document Update Lookup使用完整文档更新查找更改流

Set the fullDocument option to "updateLookup" to direct the change stream cursor to lookup the most current majority-committed version of the document associated to an update change stream event.fullDocument选项设置为"updateLookup",以引导更改流游标查找与更新更改流事件关联的文档的最新多数提交版本。

The following operation opens a change stream cursor against the data.sensors collection using the fullDocument : "updateLookup" option.以下操作使用fullDocument : "updateLookup"选项在data.sensors集合上打开一个更改流游标。

watchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ fullDocument : "updateLookup" }
)

Iterate the cursor to check for new events. 迭代游标以检查新事件。Use the cursor.isClosed() method with the cursor.tryNext() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:cursor.isClosed()方法与cursor.tryNext()方法一起使用,以确保只有在更改流游标关闭并且最新批中没有剩余对象时,循环才会退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

For any update operation, the change event returns the result of the document lookup in the fullDocument field.对于任何更新操作,更改事件都会在fullDocument字段中返回文档查找的结果。

For an example of the full document update output, see change stream update event.有关完整文档更新输出的示例,请参阅更改流更新事件

For complete documentation on change stream output, see Change Events.有关更改流输出的完整文档,请参阅更改事件

Change Streams with Document Pre- and Post-Images使用文档前后映像更改流

Starting in MongoDB 6.0, you can use change stream events to output the version of a document before and after changes (the document pre- and post-images):从MongoDB 6.0开始,您可以使用更改流事件输出更改前后的文档版本(文档前和后映像):

  • The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.前映像是替换、更新或删除之前的文档。插入的文档没有前映像。
  • The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.后映像是插入、替换或更新后的文档。已删除文档没有后映像。
  • Enable changeStreamPreAndPostImages for a collection using db.createCollection(), create, or collMod.使用db.createCollection()createcollMod为集合启用changeStreamPreAndPostImages

Pre- and post-images are not available for a change stream event if the images were:如果映像是以下情况,则前后映像对变更流事件不可用:

  • Not enabled on the collection at the time of a document update or delete operation.在文档更新或删除操作时,集合上未启用。
  • Removed after the pre- and post-image retention time set in expireAfterSeconds.expireAfterSeconds中设置的映像保留时间前后删除。

    • The following example sets expireAfterSeconds to 100 seconds on an entire cluster:以下示例将整个集群上的expireAfterSeconds设置为100秒:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • The following example returns the current changeStreamOptions settings, including expireAfterSeconds:以下示例返回当前changeStreamOptions设置,包括expireAfterSeconds

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • Setting expireAfterSeconds to off uses the default retention policy: pre- and post-images are retained until the corresponding change stream events are removed from the oplog.expireAfterSeconds设置为off将使用默认保留策略:保留前后映像,直到从oplog中删除相应的更改流事件。
    • If a change stream event is removed from the oplog, then the corresponding pre- and post-images are also deleted regardless of the expireAfterSeconds pre- and post-image retention time.如果从oplog中删除了更改流事件,则相应的前映像和后映像也将被删除,而不管映像保留时间为expireAfterSeconds之前和之后。

Additional considerations:其他注意事项:

  • Enabling pre- and post-images consumes storage space and adds processing time. Only enable pre- and post-images if you need them.启用前映像和后映像会消耗存储空间并增加处理时间。仅在需要时启用前映像和后映像。
  • Limit the change stream event size to less than 16 mebibytes. To limit the event size, you can:将更改流事件大小限制为小于16兆字节。要限制事件大小,您可以:

    • Limit the document size to 8 megabytes. You can request pre- and post-images simultaneously in the change stream output if other change stream event fields like updateDescription are not large.将文档大小限制为8 MB。如果其他更改流事件字段(如updateDescription)不大,您可以在更改流输出中同时请求前映像和后映像。
    • Request only post-images in the change stream output for documents up to 16 mebibytes if other change stream event fields like updateDescription are not large.如果其他更改流事件字段(如updateDescription)不大,则仅请求更改流输出中最多16兆字节的文档的后映像。
    • Request only pre-images in the change stream output for documents up to 16 mebibytes if:在以下情况下,仅在更改流输出中请求最多16兆字节的文档的前映像:

      • document updates affect only a small fraction of the document structure or content, and文档更新仅影响文档结构或内容的一小部分,以及
      • do not cause a replace change event. A replace event always includes the post-image.不要引起replace事件。replace事件总是包括后映像。
  • To request a pre-image, you set fullDocumentBeforeChange to required or whenAvailable in db.collection.watch(). 要请求预映像,您可以在db.collection.watch()中将fullDocumentBeforeChange设置为必需或可用。To request a post-image, you set fullDocument using the same method.要请求后映像,您可以使用相同的方法设置fullDocument
  • Pre-images are written to the config.system.preimages collection.前映像会写入config.system.preimages集合。

    • The config.system.preimages collection may become large. To limit the collection size, you can set expireAfterSeconds time for the pre-images as shown earlier.config.system.preimages集合可能会变大。要限制集合大小,您可以为预映像设置expireAfterSeconds时间,如前所示。
    • Pre-images are removed asynchronously by a background process.前映像由后台进程异步删除。

Important

Backward-Incompatible Feature向后不兼容功能

Starting in MongoDB 6.0, if you are using document pre- and post-images for change streams, you must disable changeStreamPreAndPostImages for each collection using the collMod command before you can downgrade to an earlier MongoDB version.从MongoDB 6.0开始,如果您将文档前映像和后映像用于更改流,则必须使用collMod命令为每个集合禁用changeStreamPreAndPostImages,然后才能降级到早期的MongoDB版本。

Tip

Create Collection创建集合

Create a temperatureSensor collection that has changeStreamPreAndPostImages enabled:创建启用了changeStreamPreAndPostImagestemperatureSensor(温度传感器)集合:

db.createCollection(
"temperatureSensor",
{ changeStreamPreAndPostImages: { enabled: true } }
)

Populate the temperatureSensor collection with temperature readings:用温度读数填充temperatureSensor集合:

db.temperatureSensor.insertMany( [
{ "_id" : 0, "reading" : 26.1 },
{ "_id" : 1, "reading" : 25.9 },
{ "_id" : 2, "reading" : 24.3 },
{ "_id" : 3, "reading" : 22.4 },
{ "_id" : 4, "reading" : 24.6 }
] )

The following sections show change stream examples for document pre- and post-images that use the temperatureSensor collection.以下部分显示了使用temperatureSensor集合的文档前后映像的更改流示例。

Change Stream with Document Pre-Image使用文档前映像更改流

You use the fullDocumentBeforeChange: "whenAvailable" setting to output the document pre-image, if available. The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.如果可用,您可以使用完整的fullDocumentBeforeChange: "whenAvailable"设置输出文档预映像。预映像是替换、更新或删除之前的文档。插入的文档没有预映像。

The following example creates a change stream cursor for the temperatureSensor collection using fullDocumentBeforeChange: "whenAvailable":以下示例使用fullDocumentBeforeChange: "whenAvailable"temperatureSensor集合创建更改流游标:

watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch(
[],
{ fullDocumentBeforeChange: "whenAvailable" }
)

The following example uses the cursor to check for new change stream events:以下示例使用游标检查新的更改流事件:

while ( !watchCursorFullDocumentBeforeChange.isClosed() ) {
if ( watchCursorFullDocumentBeforeChange.hasNext() ) {
printjson( watchCursorFullDocumentBeforeChange.next() );
}
}

In the example:在示例中:

  • The while loop runs until the cursor is closed.while循环一直运行到游标关闭为止。
  • hasNext() returns true if the cursor has documents.如果游标有文档,hasNext()返回true

The following example updates the reading field for a temperatureSensor document:以下示例更新了temperatureSensor文档的reading字段:

db.temperatureSensor.updateOne(
{ _id: 2 },
{ $set: { reading: 22.1 } }
)

After the temperatureSensor document is updated, the change event outputs the document pre-image in the fullDocumentBeforeChange field. The pre-image contains the temperatureSensor document reading field before it was updated. For example:更新temperatureSensor文档后,更改事件会在fullDocumentBeforeChange字段中输出文档前映像。前映像包含更新前的temperatureSensor文档reading字段。例如:

{
"_id" : {
"_data" : "82624B21...",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1649090957, 1),
"ns" : {
"db" : "test",
"coll" : "temperatureSensor"
},
"documentKey" : {
"_id" : 2
},
"updateDescription" : {
"updatedFields" : {
"reading" : 22.1
},
"removedFields" : [ ],
"truncatedArrays" : [ ]
},
"fullDocumentBeforeChange" : {
"_id" : 2,
"reading" : 24.3
}
}

Tip

Change Stream with Document Post-Image使用文档后映像更改流

You use the fullDocument: "whenAvailable" setting to output the document post-image, if available. The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.如果可用,您可以使用fullDocument: "whenAvailable"设置输出文档后映像。后映像是插入、替换或更新后的文档。已删除文档没有后映像。

The following example creates a change stream cursor for the temperatureSensor collection using fullDocument: "whenAvailable":以下示例使用fullDocument: "whenAvailable"temperatureSensor集合创建更改流游标:

watchCursorFullDocument = db.temperatureSensor.watch(
[],
{ fullDocument: "whenAvailable" }
)

The following example uses the cursor to check for new change stream events:以下示例使用游标检查新的更改流事件:

while ( !watchCursorFullDocument.isClosed() ) {
if ( watchCursorFullDocument.hasNext() ) {
printjson( watchCursorFullDocument.next() );
}
}

In the example:在示例中:

  • The while loop runs until the cursor is closed.while循环一直运行到游标关闭为止。
  • hasNext() returns true if the cursor has documents.如果游标有文档,hasNext()返回true

The following example updates the reading field for a temperatureSensor document:以下示例更新了temperatureSensor文档的reading字段:

db.temperatureSensor.updateOne(
{ _id: 1 },
{ $set: { reading: 29.5 } }
)

After the temperatureSensor document is updated, the change event outputs the document post-image in the fullDocument field. The post-image contains the temperatureSensor document reading field after it was updated. For example:更新temperatureSensor文档后,更改事件会在fullDocument字段中输出文档后映像。更新后的后映像包含temperatureSensor文档reading字段。例如:

{
"_id" : {
"_data" : "8262474D...",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1648840090, 1),
"fullDocument" : {
"_id" : 1,
"reading" : 29.5
},
"ns" : {
"db" : "test",
"coll" : "temperatureSensor"
},
"documentKey" : {
"_id" : 1
},
"updateDescription" : {
"updatedFields" : {
"reading" : 29.5
},
"removedFields" : [ ],
"truncatedArrays" : [ ]
}
}

Tip

Change Stream with Aggregation Pipeline Filter使用聚合管道筛选器更改流

Note

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果更改流聚合管道修改了事件的_id字段,则更改流将抛出异常。

The following operation opens a change stream cursor against the data.sensors collection using an aggregation pipeline to filter only insert events:以下操作使用聚合管道在data.sensors(数据.传感器)集合上打开一个更改流游标,仅筛选插入事件:

watchCursor = db.getSiblingDB("data").sensors.watch(
[
{ $match : {"operationType" : "insert" } }
]
)

Iterate the cursor to check for new events. 迭代游标以检查新事件。Use the cursor.isClosed() method with the cursor.hasNext() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:cursor.isClosed()方法与cursor.hasNext()方法结合使用,以确保循环仅在更改流游标关闭且最新批中没有剩余对象时退出:

while (!watchCursor.isClosed()){
if (watchCursor.hasNext()){
printjson(watchCursor.next());
}
}

The change stream cursor only returns change events where the operationType is insert. For complete documentation on change stream output, see Change Events.更改流游标仅返回operationTypeinsert的更改事件。有关更改流输出的完整文档,请参阅更改事件

Resuming a Change Stream恢复更改流

Every document returned by a change stream cursor includes a resume token as the _id field. 更改流游标返回的每个文档都包含一个简历令牌作为_id字段。To resume a change stream, pass the entire _id document of the change event you want to resume from to either the resumeAfter or startAfter option of watch().要恢复更改流,请将要从中恢复的更改事件的整个_id文档传递给watch()resumeAfterstartAfter选项。

The following operation resumes a change stream cursor against the data.sensors collection using a resume token. This assumes that the operation that generated the resume token has not rolled off the cluster's oplog.以下操作使用恢复令牌在data.sensors(数据.传感器)集合上恢复更改流游标。这假设生成恢复令牌的操作尚未从集群的oplog中滚出。

let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

watchCursor.close();

let resumeToken = firstChange._id;

resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ resumeAfter : resumeToken }
)

Iterate the cursor to check for new events. 迭代游标以检查新事件。Use the cursor.isClosed() method with the cursor.hasNext() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:cursor.isClosed()方法与cursor.hasNext()方法结合使用,以确保循环仅在更改流游标关闭且最新批中没有剩余对象时退出:

while (!resumedWatchCursor.isClosed()){
if (resumedWatchCursor.hasNext()){
print(resumedWatchCursor.next());
}
}

See Resume a Change Stream for complete documentation on resuming a change stream.有关恢复更改流的完整文档,请参阅恢复更改流