db.collection.watch()
On this page
Definition定义- Availability
- Deployment
- Storage Engine
- Read Concern
majority
Support Behavior行为- Resumability
- Full Document Lookup of Update Operations
Access Control访问控制- Cursor Iteration
Examples实例- Open a Change Stream
- Change Stream with Full Document Update Lookup
- Change Streams with Document Pre- and Post-Images
- Change Stream with Aggregation Pipeline Filter
- Resuming a Change Stream
Definition定义
db.collection.watch( pipeline, options )
-
Important
mongosh Method
This page documents a
mongosh
method. This is not the documentation for database commands or language-specific drivers, such as Node.js.For the database command, see the
aggregate
command with the$changeStream
aggregation stage.For MongoDB API drivers, refer to the language-specific MongoDB driver documentation.有关MongoDB API驱动程序,请参阅特定语言的MongoDB驱动程序文档。For the legacy对于遗留的mongo
shell documentation, refer to the documentation for the corresponding MongoDB Server release:mongo
shell文档,请参阅相应MongoDB Server版本的文档:For replica sets and sharded clusters only仅适用于复制集和分片集群Opens a change stream cursor on the collection.在集合上打开更改流游标。Parameter Type类型Description描述pipeline
array Optional.可选的。An Aggregation Pipeline consisting of one or more of the following aggregation stages:由以下一个或多个聚合阶段组成的聚合管道:$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)
Specify a pipeline to filter/modify the change events output.指定用于筛选/修改更改事件输出的管道。Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果变更流聚合管道修改了事件的_id
字段,则变更流将抛出异常。options
document Optional.可选的。Additional options that modify the behavior of修改watch()
.watch()
行为的其他选项。Theoptions
document can contain the following fields and values:options
文档可以包含以下字段和值:Field字段Type类型Description描述resumeAfter
document Optional.可选的。Directs指示watch()
to attempt resuming notifications starting after the operation specified in the resume token.watch()
尝试在恢复令牌中指定的操作之后开始恢复通知。Each change stream event document includes a resume token as the每个变更流事件文档都包括一个恢复令牌作为_id
field._id
字段。Pass the entire传递更改事件文档的整个_id
field of the change event document that represents the operation you want to resume after._id
字段,该字段表示要在之后恢复的操作。resumeAfter
is mutually exclusive withstartAfter
andstartAtOperationTime
.resumeAfter
与startAfter
和startAtOperationTime
互斥。startAfter
document Optional.可选的。Directs指示watch()
to attempt starting a new change stream after the operation specified in the resume token.watch()
在恢复令牌中指定的操作之后尝试启动新的更改流。Allows notifications to resume after an invalidate event.允许在发生无效事件后恢复通知。Each change stream event document includes a resume token as the每个变更流事件文档都包括一个恢复令牌作为_id
field._id
字段。Pass the entire传递更改事件文档的整个_id
field of the change event document that represents the operation you want to resume after._id
字段,该字段表示要在之后恢复的操作。startAfter
is mutually exclusive withresumeAfter
andstartAtOperationTime
.startAfter
与resumeAfter
和startAtOperationTime
互斥。New in version 4.2.fullDocument
string Optional.可选的。By default,watch()
returns the delta of those fields modified by an update operation, instead of the entire updated document.
SetfullDocument
to"updateLookup"
to directwatch()
to look up the most current majority-committed version of the updated document.watch()
returns afullDocument
field with the document lookup in addition to theupdateDescription
delta.
Starting in MongoDB 6.0, you can setfullDocument
to:"whenAvailable"
to output the document post-image, if available, after the document was inserted, replaced, or updated.以在插入、替换或更新文档后输出文档后图像(如果可用)。"required"
to output the document post-image after the document was inserted, replaced, or updated. Raises an error if the post-image is not available.以在插入、替换或更新文档后输出文档后图像。如果后期图像不可用,则引发错误。
fullDocumentBeforeChange
string Optional.可选。Starting in MongoDB 6.0, you can use the new从MongoDB 6.0开始,您可以使用新的fullDocumentBeforeChange
field and set it to:fullDocumentBeforeChange
字段,并将其设置为:"whenAvailable"
to output the document pre-image, if available, before the document was replaced, updated, or deleted.以在文档被替换、更新或删除之前输出文档预图像(如果可用)。"required"
to output the document pre-image before the document was replaced, updated, or deleted.以在文档被替换、更新或删除之前输出文档预图像。Raises an error if the pre-image is not available.如果预映像不可用,则引发错误。"off"
to suppress the document pre-image.以抑制文档预图像。"off"
is the default.是默认值。
batchSize
int Optional.可选的。Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.指定在MongoDB集群的每一批响应中返回的最大更改事件数。Has the same functionality as具有与cursor.batchSize()
.cursor.batchSize()
相同的功能。maxAwaitTimeMS
int Optional.可选的。The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.服务器在返回空批之前等待新数据更改报告给更改流游标的最长时间(以毫秒为单位)。Defaults to默认值为1000
milliseconds.1000
毫秒。collation
document Optional.可选的。Pass a collation document to specify a collation for the change stream cursor.传递排序规则文档以指定更改流游标的排序规则。
Starting in MongoDB 4.2, defaults tosimple
binary comparison if omitted. In earlier versions, change streams opened on a single collection would inherit the collection's default collation.showExpandedEvents
boolean Optional.可选的。Starting in MongoDB 6.0, change streams support change notifications for DDL events, like the createIndexes and dropIndexes events.从MongoDB 6.0开始,更改流支持DDL事件的更改通知,如createIndexes
和dropIndexes
事件。To include expanded events in a change stream, create the change stream cursor using the要在变更流中包括展开的事件,请使用showExpandedEvents
option.showExpandedEvents
选项创建变更流游标。New in version 6.0.startAtOperationTime
Timestamp Optional.可选的。The starting point for the change stream.变更流的起点。If the specified starting point is in the past, it must be in the time range of the oplog.如果指定的起点在过去,则它必须在oplog的时间范围内。To check the time range of the oplog, see要检查oplog的时间范围,请参阅rs.printReplicationInfo()
.rs.printReplicationInfo()
。startAtOperationTime
is mutually exclusive with与resumeAfter
andstartAfter
.resumeAfter
和startAfter
互斥。Returns: A cursor that remains open as long as a connection to the MongoDB deployment remains open and the collection exists. See Change Events for examples of change event documents. TipSee also:另请参阅:
Availability
Deployment
db.collection.watch()
is available for replica set and sharded cluster deployments :
-
For a replica set, you can issue
db.collection.watch()
on any data-bearing member. -
For a sharded cluster, you must issue
db.collection.watch()
on amongos
instance.
Storage Engine
You can only use db.collection.watch()
with the Wired Tiger storage engine.
Read Concern majority
Support
Starting in MongoDB 4.2, change streams are available regardless of the "majority"
read concern support; that is, read concern majority
support can be either enabled (default) or disabled to use change streams.
In MongoDB 4.0 and earlier, change streams are available only if "majority"
read concern support is enabled (default).
Behavior行为
-
db.collection.watch()
only notifies on data changes that have persisted to a majority of data-bearing members. -
The change stream cursor remains open until one of the following occurs:
-
The cursor is explicitly closed.
-
An invalidate event occurs; for example, a collection drop or rename.
-
The connection to the MongoDB deployment is closed.
-
If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
-
Resumability
Unlike the MongoDB Drivers, mongosh
does not automatically attempt to resume a change stream cursor after an error. The MongoDB drivers make one attempt to automatically resume a change stream cursor after certain errors.
db.collection.watch()
uses information stored in the oplog to produce the change event description and generate a resume token associated to that operation. If the operation identified by the resume token passed to the resumeAfter
or startAfter
option has already dropped off the oplog, db.collection.watch()
cannot resume the change stream.
See Resume a Change Stream for more information on resuming a change stream.
-
You cannot use
resumeAfter
to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event. -
If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.
You cannot use resumeAfter
to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.
Full Document Lookup of Update Operations
By default, the change stream cursor returns specific field changes/deltas for update operations. You can also configure the change stream to look up and return the current majority-committed version of the changed document. Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.
Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. If this occurs, the server closes the change stream cursor and returns an error.
Access Control访问控制
When running with access control, the user must have the find
and changeStream
privilege actions on the collection resource. That is, a user must have a role that grants the following privilege:
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
The built-in read
role provides the appropriate privileges.
Cursor Iteration
MongoDB provides multiple ways to iterate on a cursor.
The cursor.hasNext()
method blocks and waits for the next event. To monitor the watchCursor
cursor and iterate over the events, use hasNext()
like this:
while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}
The cursor.tryNext()
method is non-blocking. To monitor the watchCursor
cursor and iterate over the events, use tryNext()
like this:
while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}
Examples实例
Open a Change Stream
The following operation opens a change stream cursor against the data.sensors
collection:
watchCursor = db.getSiblingDB("data").sensors.watch()
Iterate the cursor to check for new events. Use the cursor.isClosed()
method with the cursor.tryNext()
method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:
while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}
For complete documentation on change stream output, see Change Events.
Change Stream with Full Document Update Lookup
Set the fullDocument
option to "updateLookup"
to direct the change stream cursor to lookup the most current majority-committed version of the document associated to an update change stream event.
The following operation opens a change stream cursor against the data.sensors
collection using the fullDocument : "updateLookup"
option.
watchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ fullDocument : "updateLookup" }
)
Iterate the cursor to check for new events. Use the cursor.isClosed()
method with the cursor.tryNext()
method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:
while (!watchCursor.isClosed()) {
let next = watchCursor.tryNext()
while (next !== null) {
printjson(next);
next = watchCursor.tryNext()
}
}
For any update operation, the change event returns the result of the document lookup in the fullDocument
field.
For an example of the full document update output, see change stream update event.
For complete documentation on change stream output, see Change Events.
Change Streams with Document Pre- and Post-Images
Starting in MongoDB 6.0, you can use change stream events to output the version of a document before and after changes (the document pre- and post-images):
-
The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.
-
The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.
-
Enable
changeStreamPreAndPostImages
for a collection usingdb.createCollection()
,create
, orcollMod
.
Pre- and post-images are not available for a change stream event if the images were:
-
Not enabled on the collection at the time of a document update or delete operation.
-
Removed after the pre- and post-image retention time set in
expireAfterSeconds
.-
The following example sets
expireAfterSeconds
to100
seconds:use admin
db.runCommand( {
setClusterParameter:
{ changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } }
} ) -
The following example returns the current
changeStreamOptions
settings, includingexpireAfterSeconds
:db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
-
Setting
expireAfterSeconds
tooff
uses the default retention policy: pre- and post-images are retained until the corresponding change stream events are removed from the oplog. -
If a change stream event is removed from the oplog, then the corresponding pre- and post-images are also deleted regardless of the
expireAfterSeconds
pre- and post-image retention time.
-
Additional considerations:
-
Enabling pre- and post-images consumes storage space and adds processing time. Only enable pre- and post-images if you need them.
-
Limit the change stream event size to less than 16 megabytes. To limit the event size, you can:
-
Limit the document size to 8 megabytes. You can request pre- and post-images simultaneously in the change stream output if other change stream event fields like
updateDescription
are not large. -
Request only post-images in the change stream output for documents up to 16 megabytes if other change stream event fields like
updateDescription
are not large. -
Request only pre-images in the change stream output for documents up to 16 megabytes if:
-
document updates affect only a small fraction of the document structure or content, and
-
do not cause a
replace
change event. Areplace
event always includes the post-image.
-
-
-
To request a pre-image, you set
fullDocumentBeforeChange
torequired
orwhenAvailable
indb.collection.watch()
. To request a post-image, you setfullDocument
using the same method. -
Pre-images are written to the
config.system.preimages
collection.-
The
config.system.preimages
collection may become large. To limit the collection size, you can setexpireAfterSeconds
time for the pre-images as shown earlier. -
Pre-images are removed asynchronously by a background process.
-
Backward-Incompatible Feature
Starting in MongoDB 6.0, if you are using document pre- and post-images for change streams, you must disable changeStreamPreAndPostImages for each collection using the collMod
command before you can downgrade to an earlier MongoDB version.
See also: 另请参阅:
-
For change stream events and output, see Change Events.
-
To watch a collection for changes, see
db.collection.watch()
. -
For complete examples with the change stream output, see Change Streams with Document Pre- and Post-Images.
Create Collection
Create a temperatureSensor
collection that has changeStreamPreAndPostImages enabled:
db.createCollection(
"temperatureSensor",
{ changeStreamPreAndPostImages: { enabled: true } }
)
Populate the temperatureSensor
collection with temperature readings:
db.temperatureSensor.insertMany( [
{ "_id" : 0, "reading" : 26.1 },
{ "_id" : 1, "reading" : 25.9 },
{ "_id" : 2, "reading" : 24.3 },
{ "_id" : 3, "reading" : 22.4 },
{ "_id" : 4, "reading" : 24.6 }
] )
The following sections show change stream examples for document pre- and post-images that use the temperatureSensor
collection.
Change Stream with Document Pre-Image
You use the fullDocumentBeforeChange: "whenAvailable"
setting to output the document pre-image, if available. The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.
The following example creates a change stream cursor for the temperatureSensor
collection using fullDocumentBeforeChange:
"whenAvailable"
:
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch(
[],
{ fullDocumentBeforeChange: "whenAvailable" }
)
The following example uses the cursor to check for new change stream events:
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) {
if ( watchCursorFullDocumentBeforeChange.hasNext() ) {
printjson( watchCursorFullDocumentBeforeChange.next() );
}
}
In the example:
-
The
while
loop runs until the cursor is closed. -
hasNext()
returnstrue
if the cursor has documents.
The following example updates the reading
field for a temperatureSensor
document:
db.temperatureSensor.updateOne(
{ _id: 2 },
{ $set: { reading: 22.1 } }
)
After the temperatureSensor
document is updated, the change event outputs the document pre-image in the fullDocumentBeforeChange
field. The pre-image contains the temperatureSensor
document reading
field before it was updated. For example:例如:
{
"_id" : {
"_data" : "82624B21...",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1649090957, 1),
"ns" : {
"db" : "test",
"coll" : "temperatureSensor"
},
"documentKey" : {
"_id" : 2
},
"updateDescription" : {
"updatedFields" : {
"reading" : 22.1
},
"removedFields" : [ ],
"truncatedArrays" : [ ]
},
"fullDocumentBeforeChange" : {
"_id" : 2,
"reading" : 24.3
}
}
See also: 另请参阅:
-
For document update output details, see change stream update events.
-
For change stream output details, see Change Events.
Change Stream with Document Post-Image
You use the fullDocument: "whenAvailable"
setting to output the document post-image, if available. The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.
The following example creates a change stream cursor for the temperatureSensor
collection using fullDocument:
"whenAvailable"
:
watchCursorFullDocument = db.temperatureSensor.watch(
[],
{ fullDocument: "whenAvailable" }
)
The following example uses the cursor to check for new change stream events:
while ( !watchCursorFullDocument.isClosed() ) {
if ( watchCursorFullDocument.hasNext() ) {
printjson( watchCursorFullDocument.next() );
}
}
In the example:
-
The
while
loop runs until the cursor is closed. -
hasNext()
returnstrue
if the cursor has documents.
The following example updates the reading
field for a temperatureSensor
document:
db.temperatureSensor.updateOne(
{ _id: 1 },
{ $set: { reading: 29.5 } }
)
After the temperatureSensor
document is updated, the change event outputs the document post-image in the fullDocument
field. The post-image contains the temperatureSensor
document reading
field after it was updated. For example:例如:
{
"_id" : {
"_data" : "8262474D...",
"_typeBits" : BinData(0,"QA==")
},
"operationType" : "update",
"clusterTime" : Timestamp(1648840090, 1),
"fullDocument" : {
"_id" : 1,
"reading" : 29.5
},
"ns" : {
"db" : "test",
"coll" : "temperatureSensor"
},
"documentKey" : {
"_id" : 1
},
"updateDescription" : {
"updatedFields" : {
"reading" : 29.5
},
"removedFields" : [ ],
"truncatedArrays" : [ ]
}
}
See also: 另请参阅:
-
For document update output details, see change stream update events.
-
For change stream output details, see Change Events.
Change Stream with Aggregation Pipeline Filter
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.
The following operation opens a change stream cursor against the data.sensors
collection using an aggregation pipeline to filter only insert
events:
watchCursor = db.getSiblingDB("data").sensors.watch(
[
{ $match : {"operationType" : "insert" } }
]
)
Iterate the cursor to check for new events. Use the cursor.isClosed()
method with the cursor.hasNext()
method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:
while (!watchCursor.isClosed()){
if (watchCursor.hasNext()){
printjson(watchCursor.next());
}
}
The change stream cursor only returns change events where the operationType
is insert
. For complete documentation on change stream output, see Change Events.
Resuming a Change Stream
Every document returned by a change stream cursor includes a resume token as the _id
field. To resume a change stream, pass the entire _id
document of the change event you want to resume from to either the resumeAfter
or startAfter
option of watch()
.
The following operation resumes a change stream cursor against the data.sensors
collection using a resume token. This assumes that the operation that generated the resume token has not rolled off the cluster's oplog.
let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;
while (!watchCursor.isClosed()) {
if (watchCursor.hasNext()) {
firstChange = watchCursor.next();
break;
}
}
watchCursor.close();
let resumeToken = firstChange._id;
resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
{ resumeAfter : resumeToken }
)
Iterate the cursor to check for new events. Use the cursor.isClosed()
method with the cursor.hasNext()
method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:
while (!resumedWatchCursor.isClosed()){
if (resumedWatchCursor.hasNext()){
print(resumedWatchCursor.next());
}
}
See Resume a Change Stream for complete documentation on resuming a change stream.