Database Manual / Reference / mongosh Methods / Databases

db.watch() (mongosh method方法)

Definition定义

db.watch( pipeline, options )

For replica sets and sharded clusters only仅适用于副本集和分片集群

Opens a change stream cursor for a database to report on all its non-system collections.打开数据库的更改流游标,以报告其所有非系统集合。

Parameter参数Type类型Description描述
pipelinearray数组

Optional. 可选。An Aggregation Pipeline consisting of one or more of the following aggregation stages:由以下一个或多个聚合阶段组成的聚合管道

Specify a pipeline to filter/modify the change events output.指定一个管道来筛选/修改更改事件输出。

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果更改流聚合管道修改了事件的_id字段,则更改流将抛出异常。

optionsdocument文档Optional. 可选。Additional options that modify the behavior of db.watch().修改db.watch()行为的其他选项。

The options document can contain the following fields and values:选项文档可以包含以下字段和值:

Field字段Type类型Description描述
resumeAfterdocument文档

Optional. 可选。Specifies a resume token as the logical starting point for the change stream. Cannot be used to resume the change stream after an invalidate event.指定恢复令牌作为更改流的逻辑起点。在发生invalidate事件后,无法用于恢复更改流。

resumeAfter is mutually exclusive with startAfter and startAtOperationTime.resumeAfterstartAfterstartAtOperationTime互斥。

startAfterdocument文档

Optional. 可选。Specifies a resume token as the logical starting point for the change stream. 指定恢复令牌作为更改流的逻辑起点。Unlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream.resumeAfter不同,startAfter可以通过创建新的更改流在invalidate事件后恢复通知。

startAfter is mutually exclusive with resumeAfter and startAtOperationTime.startAfterresumeAfterstartAtOperationTime互斥。

fullDocumentstring字符串

Optional. 可选。By default, db.watch() returns the delta of those fields modified by an update operation, instead of the entire updated document.默认情况下,db.watch()返回由更新操作修改的字段的增量,而不是整个更新的文档。

Set fullDocument to "updateLookup" to direct db.watch() to look up the most current majority-committed version of the updated document. fullDocument设置为"updateLookup",以指示db.watch()查找更新文档的最新多数提交版本。db.watch() returns a fullDocument field with the document lookup in addition to the updateDescription delta.db.watch()除了updateDescription增量外,还返回一个包含文档查找的fullDocument字段。

Starting in MongoDB 6.0, you can set fullDocument to:从MongoDB 6.0开始,您可以将fullDocument设置为:

  • "whenAvailable" to output the document post-image, if available, after the document was inserted, replaced, or updated.在插入、替换或更新文档后输出文档发布图像(如果可用)。
  • "required" to output the document post-image after the document was inserted, replaced, or updated. Raises an error if the post-image is not available.在插入、替换或更新文档后输出文档帖子图像。如果帖子图像不可用,则引发错误。
fullDocumentBeforeChangestring字符串

Optional. 可选。Default is "off".默认设置为"off"

Starting in MongoDB 6.0, you can use the new fullDocumentBeforeChange field and set it to:从MongoDB 6.0开始,您可以使用新的fullDocumentBeforeChange字段并将其设置为:

  • "whenAvailable" to output the document pre-image, if available, before the document was replaced, updated, or deleted.在替换、更新或删除文档之前输出文档预映像(如果可用)。
  • "required" to output the document pre-image before the document was replaced, updated, or deleted. Raises an error if the pre-image is not available.在替换、更新或删除文档之前输出文档预映像。如果预映像不可用,则引发错误。
  • "off" to suppress the document pre-image. 以抑制文档预图像。"off" is the default.是默认值。
batchSizeint

Optional. 可选。Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.指定在MongoDB集群的每批响应中返回的最大更改事件数。

Has the same functionality as cursor.batchSize().具有与cursor.batchSize()相同的功能。

maxAwaitTimeMSint

Optional. 可选。The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.服务器在返回空批之前等待新数据更改报告给更改流游标的最长时间(毫秒)。

Defaults to 1000 milliseconds.

collationdocument文档

Optional. 可选。Pass a collation document to specify a collation for the change stream cursor.传递一个排序规则文档,为更改流游标指定排序规则。

If omitted, defaults to simple binary comparison.如果省略,则默认为simple二进制比较。

startAtOperationTimeTimestamp

Optional. 可选。The starting point for the change stream. If the specified starting point is in the past, it must be in the time range of the oplog. 变更流的起点。如果指定的起点在过去,则它必须在oplog的时间范围内。To check the time range of the oplog, see rs.printReplicationInfo().要检查oplog的时间范围,请参阅rs.printReplicationInfo()

startAtOperationTime is mutually exclusive with resumeAfter and startAfter.startAtOperationTimeresumeAfterstartAfter互斥。

Returns:返回A cursor over the change event documents. 游标悬停在更改事件文档上。See Change Events for examples of change event documents.有关变更事件文档的示例,请参阅变更事件

Compatibility兼容性

This method is available in deployments hosted in the following environments:此方法在以下环境中托管的部署中可用:

  • MongoDB Atlas: The fully managed service for MongoDB deployments in the cloud:云中MongoDB部署的完全托管服务

Note

This command is supported in all MongoDB Atlas clusters. 所有MongoDB Atlas集群都支持此命令。For information on Atlas support for all commands, see Unsupported Commands.有关Atlas支持所有命令的信息,请参阅不支持的命令

  • MongoDB Enterprise: The subscription-based, self-managed version of MongoDB:MongoDB的基于订阅的自我管理版本
  • MongoDB Community: The source-available, free-to-use, and self-managed version of MongoDB:MongoDB的源代码可用、免费使用和自我管理版本

Availability可用性

Deployment部署

db.watch() is available for replica sets and sharded clusters:可用于副本集和分片集群:

  • For a replica set, you can issue db.watch() on any data-bearing member.对于副本集,您可以对任何承载数据的成员发出db.watch()
  • For a sharded cluster, you must issue db.watch() on a mongos instance.对于分片集群,必须对mongos实例发出db.watch()

Storage Engine存储引擎

You can only use db.watch() with the Wired Tiger storage engine.您只能将db.watch()与Wired Tiger存储引擎一起使用。

Read Concern majority Support读取关注majority支持

Change streams are available regardless of the "majority" read concern support; that is, read concern majority support can be either enabled (default) or disabled to use change streams.无论是否支持“多数”读取关注,都可以使用更改流;也就是说,可以启用(默认)或禁用读取关注majority支持以使用更改流。

Behavior行为

  • You cannot run db.watch() on the admin, local, or config database.您无法在adminlocalconfig数据库上运行db.watch()
  • db.watch() only notifies on data changes that have persisted to a majority of data-bearing members.仅通知大多数数据承载成员持续的数据更改。
  • The change stream cursor remains open until one of the following occurs:更改流游标将保持打开状态,直到出现以下情况之一:

    • The cursor is explicitly closed.游标已明确关闭。
    • An invalidate event occurs; for example, a collection drop or rename.发生无效事件;例如,删除或重命名集合。
    • The connection to the MongoDB deployment closes or times out. See Behavior for more information.与MongoDB部署的连接关闭或超时。有关更多信息,请参阅行为
    • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close. The closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片删除可能会导致打开的更改流游标关闭。关闭的更改流游标可能无法完全恢复。
  • You can run db.watch() for a database that does not exist. However, once the database is created and you drop the database, the change stream cursor closes.您可以对不存在的数据库运行db.watch()。但是,一旦创建了数据库并删除了数据库,更改流游标就会关闭。

Resumability可恢复性

Unlike the MongoDB Drivers, mongosh does not automatically attempt to resume a change stream cursor after an error. 与MongoDB驱动程序不同,mongosh在发生错误后不会自动尝试恢复更改流游标。The MongoDB drivers make one attempt to automatically resume a change stream cursor after certain errors.MongoDB驱动程序在出现某些错误后尝试自动恢复更改流游标。

db.watch() uses information stored in the oplog to produce the change event description and generate a resume token associated to that operation. 使用oplog中存储的信息来生成更改事件描述,并生成与该操作相关联的恢复令牌。If the operation identified by the resume token passed to the resumeAfter or startAfter option has already dropped off the oplog, db.watch() cannot resume the change stream.如果传递给resumeAfterstartAfter选项的恢复令牌标识的操作已经从oplog中删除,db.watch()将无法恢复更改流。

See Resume a Change Stream for more information on resuming a change stream.有关恢复更改流的更多信息,请参阅恢复更改流

Note

  • You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. 无效事件(例如,集合删除或重命名)关闭更改流后,您不能使用resumeAfter来恢复更改流。Instead, you can use startAfter to start a new change stream after an invalidate event.相反,您可以使用startAfter在无效事件后启动新的更改流。
  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close. The closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片删除可能会导致打开的更改流游标关闭。关闭的更改流游标可能无法完全恢复。

Note

You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. invalidate事件(例如,集合删除或重命名)关闭更改流后,您不能使用resumeAfter来恢复更改流。Instead, you can use startAfter to start a new change stream after an invalidate event.相反,您可以使用startAfter无效事件后启动新的更改流。

Full Document Lookup of Update Operations更新操作的完整文档查找

By default, the change stream cursor returns specific field changes/deltas for update operations. 默认情况下,更改流游标返回用于更新操作的特定字段更改/增量。You can also configure the change stream to look up and return the current majority-committed version of the changed document. 您还可以配置更改流,以查找并返回已更改文档的当前多数提交版本。Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档有很大不同。

Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.根据更新操作期间应用的更改数量和完整文档的大小,存在更新操作的更改事件文档的大小大于16MB BSON文档限制的风险。如果发生这种情况,服务器将关闭更改流游标并返回错误。

Access Control访问控制

When running with access control, the user must have the find and changeStream privilege actions on the database resource. 当使用访问控制运行时,用户必须对数据库资源具有findchangeStream权限操作。That is, a user must have a role that grants the following privilege:也就是说,用户必须具有授予以下权限角色

{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream"] }

The built-in read role provides the appropriate privileges.内置的read角色提供适当的权限。

Cursor Iteration游标迭代

MongoDB provides multiple ways to iterate on a cursor.MongoDB提供了多种方法来迭代游标。

The cursor.hasNext() method blocks and waits for the next event. cursor.hasNext()方法会阻塞并等待下一个事件。To monitor the watchCursor cursor and iterate over the events, use hasNext() like this:要监视watchCursor游标并迭代事件,请使用hasNext(),如下所示:

while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}

The cursor.tryNext() method is non-blocking. To monitor the watchCursor cursor and iterate over the events, use tryNext() like this:cursor.tryNext()方法是非阻塞的。要监视watchCursor游标并迭代事件,请使用tryNext(),如下所示:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

Example示例

The following operation in mongosh opens a change stream cursor on the hr database. mongosh中的以下操作将在hr数据库上打开更改流游标。The returned cursor reports on data changes to all the non-system collections in that database.返回的游标报告该数据库中所有非system集合的数据更改。

watchCursor = db.getSiblingDB("hr").watch()

Iterate the cursor to check for new events. Use the cursor.isClosed() method with the cursor.tryNext() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:迭代游标以检查新事件。将cursor.isClosed()方法与cursor.tryNext()方法一起使用,以确保只有在更改流游标关闭并且最新批中没有剩余对象时,循环才会退出:

while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}

For complete documentation on change stream output, see Change Events.有关更改流输出的完整文档,请参阅更改事件

Note

You cannot use isExhausted() with change streams.您不能将isExhausted()用于更改流