Mongo.watch()
On this page本页内容
Definition定义
Mongo.watch( pipeline, options )
-
For replica sets and sharded clusters only仅适用于复制集和分片集群Opens a change stream cursor for a replica set or a sharded cluster to report on all its non-打开副本集或分片集群的更改流游标,以报告其数据库中的所有非系统集合,system
collections across its databases, with the exception of theadmin
,local
, and theconfig
databases.admin
数据库、local
数据库和config
数据库除外。Parameter参数Type类型Description描述pipeline
array Optional.可选的。An Aggregation Pipeline consisting of one or more of the following aggregation stages:由以下一个或多个聚合阶段组成的聚合管道:$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)(从MongoDB 4.2开始提供)$redact
$set
(Available starting in MongoDB 4.2)(从MongoDB 4.2开始提供)$unset
(Available starting in MongoDB 4.2)(从MongoDB 4.2开始提供)
Specify a pipeline to filter/modify the change events output.指定用于筛选/修改更改事件输出的管道。
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果变更流聚合管道修改了事件的_id
字段,则变更流将抛出异常。options
document Optional.可选的。Additional options that modify the behavior of修改Mongo.watch()
.Mongo.watch()
行为的其他选项。Theoptions
document can contain the following fields and values:options
文档可以包含以下字段和值:Field字段Type类型Description描述resumeAfter
document Optional.可选的。Directs指示Mongo.watch()
to attempt resuming notifications starting after the operation specified in the resume token.Mongo.watch()
尝试在恢复令牌中指定的操作之后开始恢复通知。
Each change stream event document includes a resume token as the每个变更流事件文档都包括一个恢复令牌作为_id
field._id
字段。Pass the entire传递更改事件文档的整个_id
field of the change event document that represents the operation you want to resume after._id
字段,该字段表示要在之后恢复的操作。
resumeAfter
is mutually exclusive with与startAfter
andstartAtOperationTime
.startAfter
和startAtOperationTime
互斥。startAfter
document Optional.可选的。Directs指示Mongo.watch()
to attempt starting a new change stream after the operation specified in the resume token.Mongo.watch()
在恢复令牌中指定的操作之后尝试启动新的更改流。Allows notifications to resume after an invalidate event.允许在发生无效事件后恢复通知。
Each change stream event document includes a resume token as the每个变更流事件文档都包括一个恢复令牌作为_id
field._id
字段。Pass the entire传递更改事件文档的整个_id
field of the change event document that represents the operation you want to resume after._id
字段,该字段表示要在之后恢复的操作。
startAfter
is mutually exclusive withresumeAfter
andstartAtOperationTime
.startAfter
与resumeAfter
和startAtOperationTime
互斥。
New in version 4.2.4.2版新增。fullDocument
string Optional.可选的。By default,默认情况下,Mongo.watch()
returns the delta of those fields modified by an update operation, instead of the entire updated document.Mongo.watch()
返回由更新操作修改的字段的delta,而不是整个更新的文档。
Set将fullDocument
to"updateLookup"
to directMongo.watch()
to look up the most current majority-committed version of the updated document.fullDocument
设置为"updateLookup"
以指示Mongo.watch()
查找更新文档的最新多数提交版本。Mongo.watch()
returns a除了fullDocument
field with the document lookup in addition to theupdateDescription
delta.updateDescription
delta之外,还返回一个带有文档查找的fullDocument
字段。batchSize
int Optional.可选的。Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.指定在MongoDB集群的每一批响应中返回的最大更改事件数。
Has the same functionality as具有与cursor.batchSize()
.cursor.batchSize()
相同的功能。maxAwaitTimeMS
int Optional.可选的。The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.服务器在返回空批之前等待新数据更改报告给更改流游标的最长时间(以毫秒为单位)。
Defaults to默认值为1000
milliseconds.1000
毫秒。collation
document Optional.可选的。Pass a collation document to specify collation for the change stream cursor.传递排序规则文档以指定更改流游标的排序规则。
If omitted, defaults to如果省略,则默认为simple
binary comparison.simple
二进制比较。startAtOperationTime
Timestamp Optional.可选的。The starting point for the change stream.变更流的起点。If the specified starting point is in the past, it must be in the time range of the oplog.如果指定的起点在过去,则它必须在oplog的时间范围内。To check the time range of the oplog, see要检查oplog的时间范围,请参阅rs.printReplicationInfo()
.rs.printReplicationInfo()
。
startAtOperationTime
is mutually exclusive withresumeAfter
andstartAfter
.startAtOperationTime
与resumeAfter
和startAfter
互斥。Returns:返回值:A cursor over the change event documents. See Change Events for examples of change event documents.将游标悬停在更改事件文档上。有关更改事件文档的示例,请参阅更改事件。TipSee also:另请参阅:
Availability可用性
Deployment部署
Mongo.watch()
is available for replica sets and sharded clusters:可用于副本集和分片集群:
For a replica set, you can issue对于副本集,您可以对任何承载数据的成员发出Mongo.watch()
on any data-bearing member.Mongo.watch()
。For a sharded cluster, you must issue对于分片集群,必须在Mongo.watch()
on amongos
instance.mongos
实例上发出Mongo.watch()
。
Storage Engine存储引擎
You can only use 您只能将Mongo.watch()
with the Wired Tiger storage engine.Mongo.watch()
与Wired Tiger存储引擎一起使用。
Read Concern majority
Support读取关注majority
支持
majority
SupportStarting in MongoDB 4.2, change streams are available regardless of the 从MongoDB 4.2开始,无论"majority"
read concern support; that is, read concern majority
support can be either enabled (default) or disabled to use change streams."majority"
读取关注支持如何,都可以使用更改流;也就是说,为了使用变更流,可以启用(默认)或禁用读取关注majority
支持。
In MongoDB 4.0 and earlier, change streams are available only if 在MongoDB 4.0及更早版本中,只有启用了"majority"
read concern support is enabled (default)."majority"
读取关注支持(默认),更改流才可用。
Behavior行为
Mongo.watch()
only notifies on data changes that have persisted to a majority of data-bearing members.仅通知已持久存在于大多数数据承载成员的数据更改。The change stream cursor remains open until one of the following occurs:更改流游标保持打开状态,直到出现以下情况之一:The cursor is explicitly closed.游标明确关闭。An invalidate event occurs; for example, a collection drop or rename.发生失效事件;例如集合删除或重命名。The connection to the MongoDB deployment is closed.与MongoDB部署的连接已关闭。If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片移除可能会导致打开的变更流游标关闭,并且关闭的变更流游标可能无法完全恢复。
Resumability可恢复性
Unlike the MongoDB Drivers, 与MongoDB驱动程序不同,mongosh
does not automatically attempt to resume a change stream cursor after an error. mongosh
不会在出现错误后自动尝试恢复更改流游标。The MongoDB drivers make one attempt to automatically resume a change stream cursor after certain errors.MongoDB驱动程序尝试在出现某些错误后自动恢复更改流游标。
Mongo.watch()
uses information stored in the oplog to produce the change event description and generate a resume token associated to that operation. 使用oplog中存储的信息来生成更改事件描述,并生成与该操作相关联的恢复令牌。If the operation identified by the resume token passed to the 如果传递给resumeAfter
or startAfter
option has already dropped off the oplog, Mongo.watch()
cannot resume the change stream.resumeAfter
或startAfter
选项的resume令牌标识的操作已经从oplog中删除,Mongo.watch()
将无法恢复更改流。
See Resume a Change Stream for more information on resuming a change stream.有关恢复更改流的详细信息,请参阅恢复更改流。
You cannot use在无效事件(例如,集合丢弃或重命名)关闭更改流后,不能使用resumeAfter
to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream.resumeAfter
恢复更改流。Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.从MongoDB 4.2开始,您可以使用startAfter
在失效事件后启动一个新的更改流。If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片移除可能会导致打开的变更流游标关闭,并且关闭的变更流游标可能无法完全恢复。
Resume Token恢复令牌
The resume token 恢复令牌_data
type depends on the MongoDB versions and, in some cases, the feature compatibility version (fcv) at the time of the change stream's opening/resumption (i.e. a change in fcv value does not affect the resume tokens for already opened change streams):_data
类型取决于MongoDB版本,在某些情况下,还取决于更改流打开/恢复时的功能兼容性版本(fcv)(即fcv值的更改不会影响已打开更改流的恢复令牌):
MongoDB Version | _data Type_data 类型 | |
---|---|---|
MongoDB 4.2 and later | "4.2" or "4.0" | v1 )v1 ) |
MongoDB 4.0.7 and later | "4.0" or "3.6" | Hex-encoded string (v1 ) |
MongoDB 4.0.6 and earlier | "4.0" | Hex-encoded string (v0 ) |
MongoDB 4.0.6 and earlier | "3.6" | BinData |
MongoDB 3.6 | "3.6" | BinData |
Hex Encoded Tokens十六进制编码的令牌
With hex-encoded string resume tokens, you can compare and sort the resume tokens.使用十六进制编码的字符串恢复令牌,可以对恢复令牌进行比较和排序。
Regardless of the fcv value, a 4.0 deployment can use either BinData resume tokens or hex string resume tokens to resume a change stream. As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.无论fcv值如何,4.0部署都可以使用BinData恢复令牌或十六进制字符串恢复令牌来恢复更改流。因此,4.0部署可以使用在3.6部署的集合上打开的更改流中的恢复令牌。
New resume token formats introduced in a MongoDB version cannot be consumed by earlier MongoDB versions.MongoDB版本中引入的新恢复令牌格式不能被早期的MongoDB版本使用。
Decode Resume Tokens解码恢复令牌
MongoDB provides a "snippet", an extension to MongoDB提供了一个"snippet",它是mongosh
, that decodes hex-encoded resume tokens.mongosh
的扩展,用于解码十六进制编码的恢复令牌。
You can install and run the resumetoken您可以从 snippet from
mongosh
:mongosh
安装并运行resumetoken片段:
snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')
You can also run resumetoken如果系统上安装了 from the command line (without using
mongosh
) if npm
is installed on your system:npm
,您也可以从命令行运行resumetoken
(不使用
mongosh
):
npx mongodb-resumetoken-decoder <RESUME TOKEN>
See the following for more details on:有关的更多详细信息,请参阅以下内容:
Full Document Lookup of Update Operations更新操作的完整文档查找
By default, the change stream cursor returns specific field changes/deltas for update operations. 默认情况下,更改流游标返回用于更新操作的特定字段更改/增量。You can also configure the change stream to look up and return the current majority-committed version of the changed document. Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.您还可以配置更改流以查找并返回更改文档的当前多数提交版本。根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档有很大不同。
Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.根据更新操作期间应用的更改数量和完整文档的大小,更新操作的更改事件文档的大小可能大于16MB BSON文档限制。如果发生这种情况,服务器将关闭更改流游标并返回一个错误。
Availability可用性
Starting in MongoDB 4.2, change streams are available regardless of the 从MongoDB 4.2开始,无论"majority"
read concern support; that is, read concern majority
support can be either enabled (default) or disabled to use change streams."majority"
读取关注支持如何,都可以使用更改流;也就是说,为了使用变更流,可以启用(默认)或禁用读取关注多数支持。
In MongoDB 4.0 and earlier, change streams are available only if 在MongoDB 4.0及更早版本中,只有启用了"majority"
read concern support is enabled (default)."majority"
读取关注支持(默认),更改流才可用。
Access Control访问控制
When running with access control, the user must have the 在使用访问控制运行时,用户必须对所有数据库中的所有非系统集合具有find
and changeStream
privilege actions on all non-systems collections across all databases. find
和changeStream
权限操作。That is, a user must have a role that grants the following privilege:也就是说,用户必须具有授予以下权限的角色:
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
The built-in 内置的read
role provides the appropriate privileges.read
角色提供适当的权限。
Cursor Iteration游标迭代
MongoDB provides multiple ways to iterate on a cursor.MongoDB提供了多种对游标进行迭代的方法。
The cursor.hasNext()
method blocks and waits for the next event. To monitor the watchCursor
cursor and iterate over the events, use hasNext()
like this:cursor.hasNext()
方法阻塞并等待下一个事件。要监视watchCursor
游标并迭代事件,请使用hasNext()
,如下所示:
while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}
The cursor.tryNext()
method is non-blocking. cursor.tryNext()
方法是非阻塞的。To monitor the 要监视watchCursor
cursor and iterate over the events, use tryNext()
like this:watchCursor
游标并迭代事件,请使用tryNext()
,如下所示:
while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}
Example实例
The following operation in mongosh
opens a change stream cursor on a replica set. mongosh
中的以下操作将打开复制集上的更改流游标。The returned cursor reports on data changes to all the non-返回的游标报告除system
collections across all databases except for the admin
, local
, and the config
databases.admin
、local
和config
数据库之外的所有数据库中所有非system
集合的数据更改。
watchCursor = db.getMongo().watch()
Iterate the cursor to check for new events. 循环游标以检查新事件。Use the 将cursor.isClosed()
method with the cursor.tryNext()
method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:cursor.isClosed()
方法与cursor.tryNext()
方法结合使用,以确保只有在更改流游标关闭并且最新批中没有剩余对象时,循环才会退出:
while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}
For complete documentation on change stream output, see Change Events.有关变更流输出的完整文档,请参阅变更事件。
You cannot use 不能将isExhausted()
with change streams.isExhausted()
与更改流一起使用。