Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog. 变更流允许应用程序访问实时数据变更,而无需事先手动跟踪oplog的复杂性和风险。Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.应用程序可以使用更改流订阅单个集合、数据库或整个部署上的所有数据更改,并立即对其做出反应。由于变更流使用聚合框架,应用程序还可以筛选特定的变更或随意转换通知。
Note
Change streams are restricted to database events. Atlas Stream Processing has extended functionality, including managing multiple data event types and processing streams of complex data using the same query API as Atlas databases. 更改流仅限于数据库事件。Atlas流处理具有扩展的功能,包括使用与Atlas数据库相同的查询API管理多个数据事件类型和处理复杂数据流。For more information, see Atlas Stream Processing.有关更多信息,请参阅Atlas流处理。
Starting in MongoDB 5.1, change streams are optimized, providing more efficient resource utilization and faster execution of some aggregation pipeline stages.从MongoDB 5.1开始,更改流得到了优化,提供了更高效的资源利用率和更快地执行某些聚合管道阶段。
Availability可用性
Change streams are available for replica sets and sharded clusters:更改流可用于副本集和分片集群:
Storage Engine.存储引擎。The replica sets and sharded clusters must use the WiredTiger storage engine. Change streams can also be used on deployments that employ MongoDB's encryption-at-rest feature.副本集和分片群集必须使用WiredTiger存储引擎。变更流也可用于采用MongoDB静态加密功能的部署。Replica Set Protocol Version.副本集协议版本。The replica sets and sharded clusters must use replica set protocol version 1 (副本集和分片集群必须使用副本集协议版本1(pv1).pv1)。Read Concern "majority" Enablement.读取关注“多数”支持。Change streams are available regardless of the无论是否支持"majority"read concern support; that is, read concernmajoritysupport can be either enabled (default) or disabled to use change streams."majority"读取关注,都可以使用更改流;也就是说,可以启用(默认)或禁用读取关注majority支持以使用更改流。
Stable API Support稳定的API支持
Change streams are included in Stable API V1. 变更流包含在稳定的API V1中。However, the showExpandedEvents option is not included in Stable API V1.但是,showExpandedEvents选项未包含在Stable API V1中。
Connect连接
Connections for a change stream can either use DNS seed lists with the 更改流的连接可以使用带有+srv connection option or by listing the servers individually in the connection string.+srv连接选项的DNS种子列表,也可以在连接字符串中单独列出服务器。
If the driver loses the connection to a change stream or the connection goes down, it attempts to reestablish a connection to the change stream through another node in the cluster that has a matching read preference. 如果驱动程序失去与更改流的连接或连接中断,它会尝试通过群集中具有匹配读取首选项的另一个节点重新建立与更改流之间的连接。If the driver cannot find a node with the correct read preference, it throws an exception.如果驱动程序找不到具有正确读取首选项的节点,则会抛出异常。
For more information, see Connection String URI Format.有关更多信息,请参阅连接字符串URI格式。
Watch a Collection, Database, or Deployment监视集合、数据库或部署
You can open change streams against:您可以针对以下对象打开更改流:
| |
| |
|
Note
Change Stream Examples更改流示例
The examples on this page use the MongoDB drivers to illustrate how to open a change stream cursor for a collection and work with the change stream cursor.本页上的示例使用MongoDB驱动程序来说明如何打开集合的更改流游标并使用更改流游标。
Change Stream Performance Considerations更改流性能注意事项
If the amount of active change streams opened against a database exceeds the connection pool size, you may experience notification latency. 如果针对数据库打开的活动更改流的数量超过连接池大小,您可能会遇到通知延迟。Each change stream uses a connection and a getMore operation on the change stream for the period of time that it waits for the next event. 每个更改流在等待下一个事件的时间段内对更改流使用连接和getMore操作。To avoid any latency issues, you should ensure that the pool size is greater than the number of opened change streams. For details see the maxPoolSize setting.为了避免任何延迟问题,您应该确保池大小大于打开的更改流的数量。有关详细信息,请参阅maxPoolSize设置。
Sharded Cluster Considerations分片化集群考虑因素
When a change stream is opened on a sharded cluster:当在分片集群上打开更改流时:
Themongoscreates individual change streams on each shard. This behavior occurs regardless of whether the change stream targets a particular shard key range.mongos在每个分片上创建单独的变更流。无论更改流是否针对特定的分片键范围,都会发生这种行为。When the当mongosreceives change stream results, it sorts and filters those results. If needed, themongosalso performs afullDocumentlookup.mongos收到更改流结果时,它会对这些结果进行排序和筛选。如果需要,mongos还会执行完整的文档查找。
For best performance, limit the use of 为了获得最佳性能,请限制在更改流中使用$lookup queries in change streams.$lookup查询。
Open A Change Stream打开变更流
To open a change stream:要打开更改流,请执行以下操作:
For a replica set, you can issue the open change stream operation from any of the data-bearing members.对于副本集,您可以从任何数据承载成员发出打开更改流操作。For a sharded cluster, you must issue the open change stream operation from the对于分片集群,您必须从mongos.mongos发出开放更改流操作。
The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. 以下示例为集合打开一个更改流,并在游标上迭代以检索更改流文档。[1]
➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。
C
The C examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的C示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);C#
The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的C#示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();Go
The Go examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Go示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.CurrentJava(Sync)
The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Java示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();Catlin(Coroutine)
The Kotlin examples below assume that you are connected to a MongoDB replica set and can access a database that contains the 下面的Kotlin示例假设您连接到MongoDB副本集,并且可以访问包含inventory collection. inventory集合的数据库。To learn more about completing these tasks, see the Kotlin Coroutine Driver Databases and Collections guide.要了解有关完成这些任务的更多信息,请参阅Kotlin Coroutine驱动程序数据库和集合指南。
val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}motor
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
cursor = db.inventory.watch()
document = await cursor.next()Node.js
The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Node.js示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
The following example uses stream to process the change events.以下示例使用流来处理更改事件。
const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream
.on('change', next => {
// process next document
})
.once('error', () => {
// handle error
});
Alternatively, you can also use iterator to process the change events:或者,您还可以使用迭代器来处理更改事件:
const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();
ChangeStream extends EventEmitter.ChangeStream扩展了EventEmitter。
php
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();python
The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Python示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
cursor = db.inventory.watch()
next(cursor)Ruby
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的示例假设您已连接到MongoDB副本集,并访问了包含inventory collection.inventory集合的数据库。
cursor = inventory.watch.to_enum
next_change = cursor.nextSwift(async)
The Swift (Async) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.
let inventory = db.collection("inventory")
// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}Swift(Sync)
The Swift (Sync) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Swift(Sync)示例假设您已连接到MongoDB副本集并访问了包含inventory collection.inventory集合的数据库。
let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()To retrieve the data change event from the cursor, iterate the change stream cursor. For information on the change stream event, see Change Events.要从游标检索数据更改事件,请迭代更改流游标。有关更改流事件的信息,请参阅更改事件。
The change stream cursor remains open until one of the following occurs:更改流游标将保持打开状态,直到出现以下情况之一:
The cursor is explicitly closed.游标已明确关闭。An invalidate event occurs; for example, a collection drop or rename.发生无效事件;例如,删除或重命名集合。The connection to the MongoDB deployment closes or times out. See Behavior for more information.与MongoDB部署的连接关闭或超时。有关更多信息,请参阅行为。If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close. The closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片删除可能会导致打开的更改流游标关闭。关闭的更改流游标可能无法完全恢复。
Note
The lifecycle of an unclosed cursor is language-dependent.未关闭游标的生命周期取决于语言。
| [1] | startAtOperationTime to open the cursor at a particular point in time. If the specified starting point is in the past, it must be in the time range of the oplog.startAtOperationTime以在特定时间点打开游标。如果指定的起点在过去,则它必须在oplog的时间范围内。 |
Modify Change Stream Output修改更改流输出
➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。
C
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);C#
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}Go
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.CurrentJava(Sync)
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the
// collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();
The pipeline list includes a single $match stage that filters for any operations that meet one or both of the following criteria:
usernamevalue isaliceoperationTypevalue isdelete
Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.
Kotlin(Coroutine)
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))
val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}
The pipeline list includes a single $match stage that filters for any operations that meet one or both of the following criteria:pipeline列表包含一个$match阶段,用于筛选满足以下一个或两个条件的任何操作:
usernamevalue is值是aliceoperationTypevalue is值是delete
Passing the 将pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.pipeline传递给watch()方法会指示更改流在通过指定管道后返回通知。
Motor
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()Node.js
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
The following example uses stream to process the change events.以下示例使用流来处理更改事件。
const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});
Alternatively, you can also use iterator to process the change events:或者,您还可以使用迭代器来处理更改事件:
const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();php
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();Python
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)Ruby
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
Swift(Async)
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}Swift(Sync)
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:
let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()Tip
The _id field of the change stream event document act as the resume token. Do not use the pipeline to modify or remove the change stream event's 更改流事件文档的_id field._id字段充当恢复令牌。不要使用管道修改或删除更改流事件的_id字段。
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果更改流聚合管道修改了事件的_id字段,则更改流将抛出异常。
See Change Events for more information on the change stream response document format.有关更改流响应文档格式的更多信息,请参阅更改事件。
Lookup Full Document for Update Operations查找更新操作的完整文档
By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.默认情况下,更改流在更新操作期间仅返回字段的增量。但是,您可以配置更改流以返回更新文档的最新多数提交版本。
➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。
C
To return the most current majority-committed version of the updated document, pass the 要返回更新文档的最新多数提交版本,请将带有"fullDocument" option with the "updateLookup" value to the mongoc_collection_watch method."updateLookup"值的"fullDocument"选项传递给mongo_collection_watch方法。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个fullDocument field that represents the current version of the document affected by the update operation.fullDocument字段,表示受更新操作影响的文档的当前版本。
BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);C#
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将"FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" to the db.collection.watch() method."FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"传递给db.collection.watch()方法。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个FullDocument field that represents the current version of the document affected by the update operation.FullDocument字段,该字段表示受更新操作影响的文档的当前版本。
var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();Go
To return the most current majority-committed version of the updated document, 要返回更新文档的最新多数提交版本,请使用SetFullDocument(options.UpdateLookup) change stream option.SetFullDocument(options.UpdateLookup)更改流选项。
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx)
next := cs.CurrentJava(Sync)
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将FullDocument.UPDATE_LOOKUP to the db.collection.watch.fullDocument() method.FullDocument.UPDATE_LOOKUP传递给db.collection.watch.fullDocument()方法。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个FullDocument field that represents the current version of the document affected by the update operation.FullDocument字段,该字段表示受更新操作影响的文档的当前版本。
cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();Kotlin(Coroutine)
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将FullDocument.UPDATE_LOOKUP to the ChangeStreamFlow.fullDocument() method.FullDocument.UPDATE_LOOKUP传递给ChangeStreamFlow.fullDocument()方法。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个FullDocument field that represents the current version of the document affected by the update operation.FullDocument字段,该字段表示受更新操作影响的文档的当前版本。
val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}Motor
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将full_document='updateLookup' to the db.collection.watch() method.full_document='updateLookup'传递给db.collection.watch()方法。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个`full_document field that represents the current version of the document affected by the update operation.full_document字段,表示受更新操作影响的文档的当前版本。
cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()Node.js
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将{ fullDocument: 'updateLookup' } to the db.collection.watch() method.{ fullDocument: 'updateLookup' }传递给db.collection.watch()方法。
In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.
The following example uses stream to process the change events.以下示例使用流来处理更改事件。
const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});
Alternatively, you can also use iterator to process the change events:或者,您还可以使用迭代器来处理更改事件:
const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();PHP
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" to the db.watch() method."fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"传递给db.watch()方法。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个fullDocument field that represents the current version of the document affected by the update operation.fullDocument字段,表示受更新操作影响的文档的当前版本。
$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();Python
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将full_document='updateLookup' to the db.collection.watch() method.full_document='updateLookup'传递给db.collection.watch()方法。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个full_document field that represents the current version of the document affected by the update operation.full_document字段,该字段表示受更新操作影响的文档的当前版本。
cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)Ruby
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将full_document: 'updateLookup' to the db.watch() method.full_document: 'updateLookup'传递给db.watch()方法。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个full_document field that represents the current version of the document affected by the update operation.full_document字段,该字段表示受更新操作影响的文档的当前版本。
cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.nextSwift(Async)
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将options: ChangeStreamOptions(fullDocument: .updateLookup) to the watch() method.options: ChangeStreamOptions(fullDocument: .updateLookup)传递给watch()方法。
let inventory = db.collection("inventory")
// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}
// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}Swift(Sync)
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请将options: ChangeStreamOptions(fullDocument: .updateLookup) to the watch() method.options: ChangeStreamOptions(fullDocument: .updateLookup)传递给watch()方法。
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()Note
If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.如果在更新操作之后但在查找之前有一个或多个多数提交的操作修改了更新的文档,则返回的完整文档可能与更新操作时的文档存在显著差异。
However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.然而,变更流文档中包含的增量总是正确地描述了应用于该变更流事件的所关注的集合变更。
The 如果以下情况之一为真,则更新事件的fullDocument field for an update event may be missing if one of the following is true:fullDocument字段可能会丢失:
If the document is deleted or if the collection is dropped in between the update and the lookup.如果文档被删除,或者集合在更新和查找之间被删除。If the update changes the values for at least one of the fields in that collection's shard key.如果更新更改了该集合分片键中至少一个字段的值。
See Change Events for more information on the change stream response document format.有关更改流响应文档格式的更多信息,请参阅更改事件。
Resume a Change Stream恢复更改流
Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.打开游标时,通过将恢复令牌指定为resumeAfter或startAfter,可以恢复更改流。
resumeAfter for Change Streams用于变更流
You can resume a change stream after a specific event by passing a resume token to 在打开游标时,通过向resumeAfter when opening the cursor.resumeAfter传递恢复令牌,可以在特定事件后恢复更改流。
See Resume Tokens for more information on the resume token.有关恢复令牌的更多信息,请参阅恢复令牌。
Important
The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.如果时间戳是过去的,oplog必须有足够的历史记录来定位与令牌或时间戳相关的操作。You cannot use在无效事件(例如,集合删除或重命名)关闭更改流后,您不能使用resumeAfterto resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream.resumeAfter来恢复更改流。Instead, you can use startAfter to start a new change stream after an invalidate event.相反,您可以使用startAfter在无效事件后启动新的更改流。
C
In the example below, the 在下面的示例中,resumeAfter option is appended to the stream options to recreate the stream after it has been destroyed. Passing the _id to the change stream attempts to resume notifications starting after the operation specified.resumeAfter选项被附加到流选项中,以便在流被销毁后重新创建。将_id传递给更改流会尝试恢复指定操作后开始的通知。
stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);
}C#
In the example below, the 在下面的示例中,resumeToken is retrieved from the last change stream document and passed to the Watch() method as an option. resumeToken从最后一个更改流文档中检索,并作为选项传递给Watch()方法。Passing the 将resumeToken to the Watch() method directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.resumeToken传递给Watch()方法会指示更改流尝试恢复在恢复令牌中指定的操作之后开始的通知。
var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();Go
You can use ChangeStreamOptions.SetResumeAfter to specify the resume token for the change stream. 您可以使用ChangeStreamOptions.SetResumeAfter为更改流指定恢复令牌。If the resumeAfter option is set, the change stream resumes notifications after the operation specified in the resume token. 如果设置了resumeAfter选项,则更改流将在恢复令牌中指定的操作后恢复通知。The SetResumeAfter takes a value that must resolve to a resume token, e.g. resumeToken in the example below.
resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)
ok = cs.Next(ctx)
result := cs.CurrentJava(Sync)
You can use the 您可以使用resumeAfter() method to resume notifications after the operation specified in the resume token. The resumeAfter() method takes a value that must resolve to a resume token, e.g. resumeToken in the example below.resumeAfter()方法在恢复令牌中指定的操作后恢复通知。resumeAfter()方法接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken。
BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();Kotlin(Coroutine)
You can use the 您可以使用ChangeStreamFlow.resumeAfter() method to resume notifications after the operation specified in the resume token. ChangeStreamFlow.resumeAfter()方法在恢复令牌中指定的操作后恢复通知。The resumeAfter() method takes a value that must resolve to a resume token, such as the resumeToken variable in the example below.resumeAfter()方法接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken变量。
val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}Motor
You can use the 您可以使用resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.resume_after修饰符在恢复令牌中指定的操作后恢复通知。resume_after修饰符接受一个必须解析为恢复令牌的值,例如下面示例中的resume_token。
resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()Node.js
You can use the 您可以使用resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.resumeAfter选项在恢复令牌中指定的操作后恢复通知。resumeAfter选项接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken。
const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream;
changeStream
.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream
.on('change', next => {
processChange(next);
})
.once('error', error => {
// handle error
});
})
.once('error', error => {
// handle error
});PHP
You can use the 您可以使用resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. $resumeToken in the example below.resumeAfter选项在恢复令牌中指定的操作后恢复通知。resumeAfter选项接受一个必须解析为恢复令牌的值,例如下面示例中的$resumeToken。
$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();
$firstChange = $changeStream->current();Python
You can use the 您可以使用resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.resume_after修饰符在恢复令牌中指定的操作后恢复通知。resume_after修饰符接受一个必须解析为恢复令牌的值,例如下面示例中的resume_token。
resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)Ruby
You can use the 您可以使用resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.resume_after修饰符在恢复令牌中指定的操作后恢复通知。resume_after修饰符接受一个必须解析为恢复令牌的值,例如下面示例中的resume_token。
change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.nextSwift(Async)
You can use the 您可以使用resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.resumeAfter选项在恢复令牌中指定的操作后恢复通知。resumeAfter选项接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken。
let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}Swift(Sync)
You can use the 您可以使用resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.resumeAfter选项在恢复令牌中指定的操作后恢复通知。resumeAfter选项接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken。
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()startAfter for Change Streams用于变更流
You can start a new change stream after a specific event by passing a resume token to 通过在打开游标时向startAfter when opening the cursor. Unlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream.startAfter传递恢复令牌,可以在特定事件后启动新的更改流。与resumeAfter不同,startAfter可以通过创建新的更改流在无效事件后恢复通知。
See Resume Tokens for more information on the resume token.有关恢复令牌的更多信息,请参阅恢复令牌。
Important
The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.如果时间戳是过去的,oplog必须有足够的历史记录来定位与令牌或时间戳相关的操作。
Resume Tokens恢复令牌
The resume token is available from multiple sources:恢复令牌可从多个来源获得:
_id field._id字段上都包含一个恢复令牌。 | |
Aggregation |
|
| Get More | getMore command includes a resume token on the cursor.postBatchResumeToken field.getMore命令在cursor.postBatchResumeToken字段中包含一个恢复令牌。 |
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果更改流聚合管道修改了事件的_id字段,则更改流将抛出异常。
Tip
MongoDB provides a "snippet", an extension to MongoDB提供了一个“片段”,这是mongosh, that decodes hex-encoded resume tokens.mongosh的扩展,可以解码十六进制编码的恢复令牌。
You can install and run the resumetoken snippet from 您可以从mongosh:mongosh安装并运行resumetoken代码段:
snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')
You can also run resumetoken from the command line (without using 如果你的系统上安装了npm,你也可以从命令行运mongosh) if npm is installed on your system:resumetoken(不使用mongosh):
npx mongodb-resumetoken-decoder <RESUME TOKEN>
See the following for more details on:有关更多详细信息,请参阅以下内容:
resumetokenusing snippets in在mongosh.mongosh中使用片段。
Resume Tokens from Change Events从更改事件中恢复令牌
Change event notifications include a resume token on the 更改事件通知在_id field:_id字段中包含一个恢复令牌:
{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}Resume Tokens from aggregate来自aggregate的恢复令牌
aggregateWhen using the 使用aggregate command, the $changeStream aggregation stage includes a resume token on the cursor.postBatchResumeToken field:aggregate命令时,$changeStream聚合阶段在cursor.postBatchResumeToken字段上包含一个恢复令牌:
{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}Resume Tokens from getMore从getMore恢复令牌
getMoreThe getMore command also includes a resume token on the cursor.postBatchResumeToken field:getMore命令还在cursor.postBatchResumeToken字段中包含一个恢复令牌:
{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}Use Cases用例
Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.变更流可以使具有依赖业务系统的架构受益,一旦数据变更持久,就会通知下游系统。例如,变更流可以为开发人员在实现提取、转换和加载(ETL)服务、跨平台同步、协作功能和通知服务时节省时间。
Access Control访问控制
For deployments enforcing Authentication on Self-Managed Deployments and authorization:对于在自我管理部署上强制执行身份验证和授权的部署:
To open a change stream against specific collection, applications must have privileges that grant要针对特定集合打开更改流,应用程序必须具有授予changeStreamandfindactions on the corresponding collection.changeStream的权限,并在相应的集合上授予find操作。{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }To open a change stream on a single database, applications must have privileges that grant要在单个数据库上打开更改流,应用程序必须具有授予changeStreamandfindactions on all non-systemcollections in the database.changeStream的权限,并在数据库中的所有非系统集合上授予find操作。{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }To open a change stream on an entire deployment, applications must have privileges that grant要在整个部署上打开更改流,应用程序必须具有授予changeStreamandfindactions on all non-systemcollections for all databases in the deployment.changeStream的权限,并在部署中的所有数据库的所有非系统集合授予上find操作。{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
Event Notification事件通知
Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.更改流仅通知已持久化到副本集中大多数数据承载成员的数据更改。这确保了通知仅由在失败情况下持久的大多数提交的更改触发。
For example, consider a 3-member replica set with a change stream cursor opened against the primary. 例如,考虑一个由3个成员组成的副本集,其中更改流游标在primary上打开。If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.如果客户端发出插入操作,则只有在插入持久化到大多数数据承载成员时,更改流才会通知应用程序数据更改。
If an operation is associated with a transaction, the change event document includes the 如果操作与事务相关联,则更改事件文档包括txnNumber and the lsid.txnNumber和lsid。
Collation排序规则
Change streams use 除非提供明确的排序规则,否则更改流使用simple binary comparisons unless an explicit collation is provided.simple二进制比较。
Change Streams and Orphan Documents更改流和孤立文档
Starting in MongoDB 5.3, during range migration, change stream events are not generated for updates to orphaned documents.从MongoDB 5.3开始,在范围迁移期间,不会为孤立文档的更新生成更改流事件。
Change Streams with Document Pre- and Post-Images使用文档前映像和后映像更改流
Starting in MongoDB 6.0, you can use change stream events to output the version of a document before and after changes (the document pre- and post-images):从MongoDB 6.0开始,您可以使用更改流事件输出更改前后的文档版本(文档前映像和后映像):
The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.前映像是替换、更新或删除之前的文档。插入的文档没有前映像。The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.后映像是插入、替换或更新后的文档。已删除文档没有后映像。Enable使用changeStreamPreAndPostImagesfor a collection usingdb.createCollection(),create, orcollMod.db.createCollection()、create或collMod为集合启用changeStreamPreAndPostImages。
Pre- and post-images are not available for a change stream event if the images were:如果映像是以下情况,则前映像和后映像对更改流事件不可用:
Not enabled on the collection at the time of a document update or delete operation.在文档更新或删除操作时,集合上未启用。Removed after the pre- and post-image retention time set in在expireAfterSeconds.expireAfterSeconds中设置的映像保留时间前后删除。The following example sets以下示例将整个集群上的expireAfterSecondsto100seconds on an entire cluster:expireAfterSeconds设置为100秒:use admin
db.runCommand( {
setClusterParameter:
{ changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 }
} }
} )The following example returns the current以下示例返回当前changeStreamOptionssettings, includingexpireAfterSeconds:changeStreamOptions设置,包括expireAfterSeconds:db.adminCommand( { getClusterParameter: "changeStreamOptions" } )Setting将expireAfterSecondstooffuses the default retention policy: pre- and post-images are retained until the corresponding change stream events are removed from the oplog.expireAfterSeconds设置为off(关闭)将使用默认保留策略:保留前后映像,直到从oplog中删除相应的更改流事件。If a change stream event is removed from the oplog, then the corresponding pre- and post-images are also deleted regardless of the如果从oplog中删除了更改流事件,则相应的前映像和后映像也将被删除,而不管映像保留时间为expireAfterSecondspre- and post-image retention time.expireAfterSeconds之前和之后。
Additional considerations:其他注意事项:
Enabling pre- and post-images consumes storage space and adds processing time. Only enable pre- and post-images if you need them.启用前映像和后映像会消耗存储空间并增加处理时间。仅在需要时启用前映像和后映像。Limit the change stream event size to less than 16 mebibytes. To limit the event size, you can:将更改流事件大小限制为小于16兆字节。要限制事件大小,您可以:Limit the document size to 8 megabytes. You can request pre- and post-images simultaneously in the change stream output if other change stream event fields like将文档大小限制为8 MB。如果其他更改流事件字段(如updateDescriptionare not large.updateDescription)不大,您可以在更改流输出中同时请求前映像和后映像。Request only post-images in the change stream output for documents up to 16 mebibytes if other change stream event fields like如果其他更改流事件字段(如updateDescriptionare not large.updateDescription)不大,则仅请求更改流输出中最多16兆字节的文档的后映像。Request only pre-images in the change stream output for documents up to 16 mebibytes if:在以下情况下,仅在更改流输出中请求最多16兆字节的文档的前映像:document updates affect only a small fraction of the document structure or content, and文档更新仅影响文档结构或内容的一小部分,以及do not cause a不要引起replacechange event. Areplaceevent always includes the post-image.replace事件。replace事件总是包括后映像。
To request a pre-image, you set要请求前映像,您可以在fullDocumentBeforeChangetorequiredorwhenAvailableindb.collection.watch(). To request a post-image, you setfullDocumentusing the same method.db.collection.watch()中将fullDocumentBeforeChange设置为required(必需)或whenAvailable(在可用时)。要请求后映像,您可以使用相同的方法设置fullDocument。Pre-images are written to the预映像会写入config.system.preimagescollection.config.system.preimages集合。Theconfig.system.preimagescollection may become large. To limit the collection size, you can setexpireAfterSecondstime for the pre-images as shown earlier.config.system.preimages集合可能会变大。要限制集合大小,您可以为预映像设置expireAfterSeconds时间,如前所示。Pre-images are removed asynchronously by a background process.前映像由后台进程异步删除。
Important
Backward-Incompatible Feature向后不兼容功能
Starting in MongoDB 6.0, if you are using document pre- and post-images for change streams, you must disable changeStreamPreAndPostImages for each collection using the 从MongoDB 6.0开始,如果您将文档预映像和后映像用于更改流,则必须使用collMod command before you can downgrade to an earlier MongoDB version.collMod命令为每个集合禁用changeStreamPreAndPostImages,然后才能降级到早期的MongoDB版本。
Tip
For change stream events and output, see Change Events.有关更改流事件和输出,请参阅更改事件。To watch a collection for changes, see要查看集合的更改,请参阅db.collection.watch().db.collection.watch()。For complete examples with the change stream output, see Change Streams with Document Pre- and Post-Images.有关更改流输出的完整示例,请参阅使用文档前后映像更改流。
For complete examples with the change stream output, see Change Streams with Document Pre- and Post-Images.有关更改流输出的完整示例,请参阅使用文档前后映像更改流。