Database Manual

Change Streams更改流

Change streams allow applications to access real-time data changes without the prior complexity and risk of manually tailing the oplog. 变更流允许应用程序访问实时数据变更,而无需事先手动跟踪oplog的复杂性和风险。Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.应用程序可以使用更改流订阅单个集合、数据库或整个部署上的所有数据更改,并立即对其做出反应。由于变更流使用聚合框架,应用程序还可以筛选特定的变更或随意转换通知。

Note

Change streams are restricted to database events. Atlas Stream Processing has extended functionality, including managing multiple data event types and processing streams of complex data using the same query API as Atlas databases. 更改流仅限于数据库事件。Atlas流处理具有扩展的功能,包括使用与Atlas数据库相同的查询API管理多个数据事件类型和处理复杂数据流。For more information, see Atlas Stream Processing.有关更多信息,请参阅Atlas流处理

Starting in MongoDB 5.1, change streams are optimized, providing more efficient resource utilization and faster execution of some aggregation pipeline stages.从MongoDB 5.1开始,更改流得到了优化,提供了更高效的资源利用率和更快地执行某些聚合管道阶段。

Availability可用性

Change streams are available for replica sets and sharded clusters:更改流可用于副本集分片集群

  • Storage Engine.存储引擎。

    The replica sets and sharded clusters must use the WiredTiger storage engine. Change streams can also be used on deployments that employ MongoDB's encryption-at-rest feature.副本集和分片群集必须使用WiredTiger存储引擎。变更流也可用于采用MongoDB静态加密功能的部署。

  • Replica Set Protocol Version.副本集协议版本。

    The replica sets and sharded clusters must use replica set protocol version 1 (pv1).副本集和分片集群必须使用副本集协议版本1(pv1)。

  • Read Concern "majority" Enablement.读取关注“多数”支持。

    Change streams are available regardless of the "majority" read concern support; that is, read concern majority support can be either enabled (default) or disabled to use change streams.无论是否支持"majority"读取关注,都可以使用更改流;也就是说,可以启用(默认)或禁用读取关注majority支持以使用更改流。

Stable API Support稳定的API支持

Change streams are included in Stable API V1. 变更流包含在稳定的API V1中。However, the showExpandedEvents option is not included in Stable API V1.但是,showExpandedEvents选项未包含在Stable API V1中。

Connect连接

Connections for a change stream can either use DNS seed lists with the +srv connection option or by listing the servers individually in the connection string.更改流的连接可以使用带有+srv连接选项的DNS种子列表,也可以在连接字符串中单独列出服务器。

If the driver loses the connection to a change stream or the connection goes down, it attempts to reestablish a connection to the change stream through another node in the cluster that has a matching read preference. 如果驱动程序失去与更改流的连接或连接中断,它会尝试通过群集中具有匹配读取首选项的另一个节点重新建立与更改流之间的连接。If the driver cannot find a node with the correct read preference, it throws an exception.如果驱动程序找不到具有正确读取首选项的节点,则会抛出异常。

For more information, see Connection String URI Format.有关更多信息,请参阅连接字符串URI格式

Watch a Collection, Database, or Deployment监视集合、数据库或部署

You can open change streams against:您可以针对以下对象打开更改流:

Target目标Description描述
A collection集合

You can open a change stream cursor for a single collection (except system collections, or any collections in the admin, local, and config databases).您可以为单个集合打开更改流游标(system集合或adminlocalconfig数据库中的任何集合除外)。

The examples on this page use the MongoDB drivers to open and work with a change stream cursor for a single collection. See also the mongosh method db.collection.watch().此页面上的示例使用MongoDB驱动程序打开并使用单个集合的更改流游标。另请参见mongosh方法db.collection.watch()

A database数据库

You can open a change stream cursor for a single database (excluding admin, local, and config database) to watch for changes to all its non-system collections.您可以打开单个数据库(不包括adminlocalconfig数据库)的更改流游标,以监视其所有非系统集合的更改。

For the MongoDB driver method, refer to your driver documentation. See also the mongosh method db.watch().有关MongoDB驱动程序方法,请参阅您的驱动程序文档。另请参见mongosh方法db.watch()

A deployment部署

You can open a change stream cursor for a deployment (either a replica set or a sharded cluster) to watch for changes to all non-system collections across all databases except for admin, local, and config.您可以打开部署(副本集或分片集群)的更改流游标,以监视除adminlocalconfig之外的所有数据库中所有非系统集合的更改。

For the MongoDB driver method, refer to your driver documentation. 有关MongoDB驱动程序方法,请参阅您的驱动程序文档。See also the mongosh method Mongo.watch().另请参见mongosh方法Mongo.watch()

Note

Change Stream Examples更改流示例

The examples on this page use the MongoDB drivers to illustrate how to open a change stream cursor for a collection and work with the change stream cursor.本页上的示例使用MongoDB驱动程序来说明如何打开集合的更改流游标并使用更改流游标。

Change Stream Performance Considerations更改流性能注意事项

If the amount of active change streams opened against a database exceeds the connection pool size, you may experience notification latency. 如果针对数据库打开的活动更改流的数量超过连接池大小,您可能会遇到通知延迟。Each change stream uses a connection and a getMore operation on the change stream for the period of time that it waits for the next event. 每个更改流在等待下一个事件的时间段内对更改流使用连接和getMore操作。To avoid any latency issues, you should ensure that the pool size is greater than the number of opened change streams. For details see the maxPoolSize setting.为了避免任何延迟问题,您应该确保池大小大于打开的更改流的数量。有关详细信息,请参阅maxPoolSize设置。

Sharded Cluster Considerations分片化集群考虑因素

When a change stream is opened on a sharded cluster:当在分片集群上打开更改流时:

  • The mongos creates individual change streams on each shard. This behavior occurs regardless of whether the change stream targets a particular shard key range.mongos在每个分片上创建单独的变更流。无论更改流是否针对特定的分片键范围,都会发生这种行为。
  • When the mongos receives change stream results, it sorts and filters those results. If needed, the mongos also performs a fullDocument lookup.mongos收到更改流结果时,它会对这些结果进行排序和筛选。如果需要,mongos还会执行完整的文档查找。

For best performance, limit the use of $lookup queries in change streams.为了获得最佳性能,请限制在更改流中使用$lookup查询。

Open A Change Stream打开变更流

To open a change stream:要打开更改流,请执行以下操作:

  • For a replica set, you can issue the open change stream operation from any of the data-bearing members.对于副本集,您可以从任何数据承载成员发出打开更改流操作。
  • For a sharded cluster, you must issue the open change stream operation from the mongos.对于分片集群,您必须从mongos发出开放更改流操作。

The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. 以下示例为集合打开一个更改流,并在游标上迭代以检索更改流文档。[1]


Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。


C

The C examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的C示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;

collection = mongoc_database_get_collection (db, "inventory");
stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}

mongoc_change_stream_destroy (stream);
C#

The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的C#示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();
Go

The Go examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的Go示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

cs, err := coll.Watch(ctx, mongo.Pipeline{})
assert.NoError(t, err)
defer cs.Close(ctx)

ok := cs.Next(ctx)
next := cs.Current
Java(Sync)

The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的Java示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();
Catlin(Coroutine)

The Kotlin examples below assume that you are connected to a MongoDB replica set and can access a database that contains the inventory collection. 下面的Kotlin示例假设您连接到MongoDB副本集,并且可以访问包含inventory集合的数据库。To learn more about completing these tasks, see the Kotlin Coroutine Driver Databases and Collections guide.要了解有关完成这些任务的更多信息,请参阅Kotlin Coroutine驱动程序数据库和集合指南

val job = launch {
val changeStream = collection.watch()
changeStream.collect {
println("Received a change event: $it")
}
}
motor

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

cursor = db.inventory.watch()
document = await cursor.next()
Node.js

The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的Node.js示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

The following example uses stream to process the change events.以下示例使用流来处理更改事件。

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream
.on('change', next => {
// process next document
})
.once('error', () => {
// handle error
});

Alternatively, you can also use iterator to process the change events:或者,您还可以使用迭代器来处理更改事件:

const collection = db.collection('inventory');
const changeStream = collection.watch();
const next = await changeStream.next();

ChangeStream extends EventEmitter.ChangeStream扩展了EventEmitter

php

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

$changeStream = $db->inventory->watch();
$changeStream->rewind();

$firstChange = $changeStream->current();

$changeStream->next();

$secondChange = $changeStream->current();
python

The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的Python示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

cursor = db.inventory.watch()
next(cursor)
Ruby

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库

cursor = inventory.watch.to_enum
next_change = cursor.next
Swift(async)

The Swift (Async) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

let inventory = db.collection("inventory")

// Option 1: retrieve next document via next()
let next = inventory.watch().flatMap { cursor in
cursor.next()
}

// Option 2: register a callback to execute for each document
let result = inventory.watch().flatMap { cursor in
cursor.forEach { event in
// process event
print(event)
}
}
Swift(Sync)

The Swift (Sync) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的Swift(Sync)示例假设您已连接到MongoDB副本集并访问了包含inventory集合的数据库

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

To retrieve the data change event from the cursor, iterate the change stream cursor. For information on the change stream event, see Change Events.要从游标检索数据更改事件,请迭代更改流游标。有关更改流事件的信息,请参阅更改事件

The change stream cursor remains open until one of the following occurs:更改流游标将保持打开状态,直到出现以下情况之一:

  • The cursor is explicitly closed.游标已明确关闭。
  • An invalidate event occurs; for example, a collection drop or rename.发生无效事件;例如,删除或重命名集合。
  • The connection to the MongoDB deployment closes or times out. See Behavior for more information.与MongoDB部署的连接关闭或超时。有关更多信息,请参阅行为。
  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close. The closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片删除可能会导致打开的更改流游标关闭。关闭的更改流游标可能无法完全恢复。

Note

The lifecycle of an unclosed cursor is language-dependent.未关闭游标的生命周期取决于语言。

[1] You can specify a startAtOperationTime to open the cursor at a particular point in time. If the specified starting point is in the past, it must be in the time range of the oplog.您可以指定startAtOperationTime以在特定时间点打开游标。如果指定的起点在过去,则它必须在oplog的时间范围内。

Modify Change Stream Output修改更改流输出


Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。


C

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

pipeline = BCON_NEW ("pipeline",
"[",
"{",
"$match",
"{",
"fullDocument.username",
BCON_UTF8 ("alice"),
"}",
"}",
"{",
"$addFields",
"{",
"newField",
BCON_UTF8 ("this is an added field!"),
"}",
"}",
"]");

stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}

mongoc_change_stream_destroy (stream);
C#

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.FullDocument["username"] == "alice" ||
change.OperationType == ChangeStreamOperationType.Delete)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
"{ $addFields : { newField : 'this is an added field!' } }");

var collection = database.GetCollection<BsonDocument>("inventory");
using (var cursor = collection.Watch(pipeline))
{
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
}
Go

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{"fullDocument.username", "alice"}},
bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
assert.NoError(t, err)
defer cs.Close(ctx)

ok := cs.Next(ctx)
next := cs.Current
Java(Sync)

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

MongoClient mongoClient = MongoClients.create("mongodb://<username>:<password>@<host>:<port>");

// Select the MongoDB database and collection to open the change stream against

MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");

MongoCollection<Document> collection = db.getCollection("myTargetCollection");

// Create $match pipeline stage.
List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
Document.parse("{'fullDocument.username': 'alice'}"),
Filters.in("operationType", asList("delete")))));

// Create the change stream cursor, passing the pipeline to the
// collection.watch() method

MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

The pipeline list includes a single $match stage that filters for any operations that meet one or both of the following criteria:

  • username value is alice
  • operationType value is delete

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.

Kotlin(Coroutine)

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

val pipeline = listOf(
Aggregates.match(
or(
eq("fullDocument.username", "alice"),
`in`("operationType", listOf("delete"))
)
))

val job = launch {
val changeStream = collection.watch(pipeline)
changeStream.collect {
println("Received a change event: $it")
}
}

The pipeline list includes a single $match stage that filters for any operations that meet one or both of the following criteria:pipeline列表包含一个$match阶段,用于筛选满足以下一个或两个条件的任何操作:

  • username value is 值是alice
  • operationType value is 值是delete

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.pipeline传递给watch()方法会指示更改流在通过指定管道后返回通知。

Motor

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()
Node.js

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

The following example uses stream to process the change events.以下示例使用流来处理更改事件。

const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];

const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});

Alternatively, you can also use iterator to process the change events:或者,您还可以使用迭代器来处理更改事件:

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();
php

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

$pipeline = [
['$match' => ['fullDocument.username' => 'alice']],
['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();

$firstChange = $changeStream->current();

$changeStream->next();

$secondChange = $changeStream->current();
Python

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

pipeline = [
{"$match": {"fullDocument.username": "alice"}},
{"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
next(cursor)
Ruby

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

Swift(Async)

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")

// Option 1: use next() to iterate
let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.next()
}

// Option 2: register a callback to execute for each document
let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}
Swift(Sync)

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置更改流时,您可以通过提供以下一个或多个管道阶段的数组来控制更改流输出

let pipeline: [BSONDocument] = [
["$match": ["fullDocument.username": "alice"]],
["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()

Tip

The _id field of the change stream event document act as the resume token. Do not use the pipeline to modify or remove the change stream event's _id field.更改流事件文档的_id字段充当恢复令牌。不要使用管道修改或删除更改流事件的_id字段。

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果更改流聚合管道修改了事件的_id字段,则更改流将抛出异常。

See Change Events for more information on the change stream response document format.有关更改流响应文档格式的更多信息,请参阅更改事件

Lookup Full Document for Update Operations查找更新操作的完整文档

By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.默认情况下,更改流在更新操作期间仅返回字段的增量。但是,您可以配置更改流以返回更新文档的最新多数提交版本。


Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。


C

To return the most current majority-committed version of the updated document, pass the "fullDocument" option with the "updateLookup" value to the mongoc_collection_watch method.要返回更新文档的最新多数提交版本,请将带有"updateLookup"值的"fullDocument"选项传递给mongo_collection_watch方法。

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个fullDocument字段,表示受更新操作影响的文档的当前版本。

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}

mongoc_change_stream_destroy (stream);
C#

To return the most current majority-committed version of the updated document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" to the db.collection.watch() method.要返回更新文档的最新多数提交版本,请将"FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"传递给db.collection.watch()方法。

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个FullDocument字段,该字段表示受更新操作影响的文档的当前版本。

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();
Go

To return the most current majority-committed version of the updated document, SetFullDocument(options.UpdateLookup) change stream option.要返回更新文档的最新多数提交版本,请使用SetFullDocument(options.UpdateLookup)更改流选项。

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
assert.NoError(t, err)
defer cs.Close(ctx)

ok := cs.Next(ctx)
next := cs.Current
Java(Sync)

To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to the db.collection.watch.fullDocument() method.要返回更新文档的最新多数提交版本,请将FullDocument.UPDATE_LOOKUP传递给db.collection.watch.fullDocument()方法。

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个FullDocument字段,该字段表示受更新操作影响的文档的当前版本。

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();
Kotlin(Coroutine)

To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to the ChangeStreamFlow.fullDocument() method.要返回更新文档的最新多数提交版本,请将FullDocument.UPDATE_LOOKUP传递给ChangeStreamFlow.fullDocument()方法。

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个FullDocument字段,该字段表示受更新操作影响的文档的当前版本。

val job = launch {
val changeStream = collection.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP)
changeStream.collect {
println(it)
}
}
Motor

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.要返回更新文档的最新多数提交版本,请将full_document='updateLookup'传递给db.collection.watch()方法。

In the example below, all update operations notifications include a `full_document field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个full_document字段,表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()
Node.js

To return the most current majority-committed version of the updated document, pass { fullDocument: 'updateLookup' } to the db.collection.watch() method.要返回更新文档的最新多数提交版本,请将{ fullDocument: 'updateLookup' }传递给db.collection.watch()方法。

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

The following example uses stream to process the change events.以下示例使用流来处理更改事件。

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream
.on('change', next => {
// process next document
})
.once('error', error => {
// handle error
});

Alternatively, you can also use iterator to process the change events:或者,您还可以使用迭代器来处理更改事件:

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();
PHP

To return the most current majority-committed version of the updated document, pass "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" to the db.watch() method.要返回更新文档的最新多数提交版本,请将"fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"传递给db.watch()方法。

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个fullDocument字段,表示受更新操作影响的文档的当前版本。

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();

$firstChange = $changeStream->current();

$changeStream->next();

$secondChange = $changeStream->current();
Python

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.要返回更新文档的最新多数提交版本,请将full_document='updateLookup'传递给db.collection.watch()方法。

In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个full_document字段,该字段表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
next(cursor)
Ruby

To return the most current majority-committed version of the updated document, pass full_document: 'updateLookup' to the db.watch() method.要返回更新文档的最新多数提交版本,请将full_document: 'updateLookup'传递给db.watch()方法。

In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个full_document字段,该字段表示受更新操作影响的文档的当前版本。

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next
Swift(Async)

To return the most current majority-committed version of the updated document, pass options: ChangeStreamOptions(fullDocument: .updateLookup) to the watch() method.要返回更新文档的最新多数提交版本,请将options: ChangeStreamOptions(fullDocument: .updateLookup)传递给watch()方法。

let inventory = db.collection("inventory")

// Option 1: use next() to iterate
let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next()
}

// Option 2: register a callback to execute for each document
let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.forEach { event in
// process event
print(event)
}
}
Swift(Sync)

To return the most current majority-committed version of the updated document, pass options: ChangeStreamOptions(fullDocument: .updateLookup) to the watch() method.要返回更新文档的最新多数提交版本,请将options: ChangeStreamOptions(fullDocument: .updateLookup)传递给watch()方法。

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

Note

If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.如果在更新操作之后但在查找之前有一个或多个多数提交的操作修改了更新的文档,则返回的完整文档可能与更新操作时的文档存在显著差异。

However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.然而,变更流文档中包含的增量总是正确地描述了应用于该变更流事件的所关注的集合变更。

The fullDocument field for an update event may be missing if one of the following is true:如果以下情况之一为真,则更新事件的fullDocument字段可能会丢失:

  • If the document is deleted or if the collection is dropped in between the update and the lookup.如果文档被删除,或者集合在更新和查找之间被删除。
  • If the update changes the values for at least one of the fields in that collection's shard key.如果更新更改了该集合分片键中至少一个字段的值。

See Change Events for more information on the change stream response document format.有关更改流响应文档格式的更多信息,请参阅更改事件

Resume a Change Stream恢复更改流

Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.打开游标时,通过将恢复令牌指定为resumeAfterstartAfter,可以恢复更改流。

resumeAfter for Change Streams用于变更流

You can resume a change stream after a specific event by passing a resume token to resumeAfter when opening the cursor.在打开游标时,通过向resumeAfter传递恢复令牌,可以在特定事件后恢复更改流。

See Resume Tokens for more information on the resume token.有关恢复令牌的更多信息,请参阅恢复令牌

Important

  • The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.如果时间戳是过去的,oplog必须有足够的历史记录来定位与令牌或时间戳相关的操作。
  • You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. 无效事件(例如,集合删除或重命名)关闭更改流后,您不能使用resumeAfter来恢复更改流。Instead, you can use startAfter to start a new change stream after an invalidate event.相反,您可以使用startAfter无效事件后启动新的更改流。
C

In the example below, the resumeAfter option is appended to the stream options to recreate the stream after it has been destroyed. Passing the _id to the change stream attempts to resume notifications starting after the operation specified.在下面的示例中,resumeAfter选项被附加到流选项中,以便在流被销毁后重新创建。将_id传递给更改流会尝试恢复指定操作后开始的通知。

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
resume_token = mongoc_change_stream_get_resume_token (stream);
BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);

mongoc_change_stream_destroy (stream);
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
mongoc_change_stream_destroy (stream);
} else {
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
MONGOC_ERROR ("%s\n", error.message);
}

mongoc_change_stream_destroy (stream);
}
C#

In the example below, the resumeToken is retrieved from the last change stream document and passed to the Watch() method as an option. 在下面的示例中,resumeToken从最后一个更改流文档中检索,并作为选项传递给Watch()方法。Passing the resumeToken to the Watch() method directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.resumeToken传递给Watch()方法会指示更改流尝试恢复在恢复令牌中指定的操作之后开始的通知。

  var resumeToken = previousCursor.GetResumeToken();
var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
var cursor = inventory.Watch(options);
cursor.MoveNext();
var next = cursor.Current.First();
cursor.Dispose();
Go

You can use ChangeStreamOptions.SetResumeAfter to specify the resume token for the change stream. 您可以使用ChangeStreamOptions.SetResumeAfter为更改流指定恢复令牌。If the resumeAfter option is set, the change stream resumes notifications after the operation specified in the resume token. 如果设置了resumeAfter选项,则更改流将在恢复令牌中指定的操作后恢复通知。The SetResumeAfter takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

resumeToken := original.ResumeToken()

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
assert.NoError(t, err)
defer cs.Close(ctx)

ok = cs.Next(ctx)
result := cs.Current
Java(Sync)

You can use the resumeAfter() method to resume notifications after the operation specified in the resume token. The resumeAfter() method takes a value that must resolve to a resume token, e.g. resumeToken in the example below.您可以使用resumeAfter()方法在恢复令牌中指定的操作后恢复通知。resumeAfter()方法接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();
Kotlin(Coroutine)

You can use the ChangeStreamFlow.resumeAfter() method to resume notifications after the operation specified in the resume token. 您可以使用ChangeStreamFlow.resumeAfter()方法在恢复令牌中指定的操作后恢复通知。The resumeAfter() method takes a value that must resolve to a resume token, such as the resumeToken variable in the example below.resumeAfter()方法接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken变量。

val resumeToken = BsonDocument()
val job = launch {
val changeStream = collection.watch()
.resumeAfter(resumeToken)
changeStream.collect {
println(it)
}
}
Motor

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.您可以使用resume_after修饰符在恢复令牌中指定的操作后恢复通知。resume_after修饰符接受一个必须解析为恢复令牌的值,例如下面示例中的resume_token。

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()
Node.js

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.您可以使用resumeAfter选项在恢复令牌中指定的操作后恢复通知。resumeAfter选项接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken

const collection = db.collection('inventory');
const changeStream = collection.watch();

let newChangeStream;
changeStream
.once('change', next => {
const resumeToken = changeStream.resumeToken;
changeStream.close();

newChangeStream = collection.watch([], { resumeAfter: resumeToken });
newChangeStream
.on('change', next => {
processChange(next);
})
.once('error', error => {
// handle error
});
})
.once('error', error => {
// handle error
});
PHP

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. $resumeToken in the example below.您可以使用resumeAfter选项在恢复令牌中指定的操作后恢复通知。resumeAfter选项接受一个必须解析为恢复令牌的值,例如下面示例中的$resumeToken

$resumeToken = $changeStream->getResumeToken();

if ($resumeToken === null) {
throw new \Exception('Resume token was not found');
}

$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]);
$changeStream->rewind();

$firstChange = $changeStream->current();
Python

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.您可以使用resume_after修饰符在恢复令牌中指定的操作后恢复通知。resume_after修饰符接受一个必须解析为恢复令牌的值,例如下面示例中的resume_token

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
next(cursor)
Ruby

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.您可以使用resume_after修饰符在恢复令牌中指定的操作后恢复通知。resume_after修饰符接受一个必须解析为恢复令牌的值,例如下面示例中的resume_token

  change_stream = inventory.watch
cursor = change_stream.to_enum
next_change = cursor.next
resume_token = change_stream.resume_token

new_cursor = inventory.watch([], resume_after: resume_token).to_enum
resumed_change = new_cursor.next
Swift(Async)

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.您可以使用resumeAfter选项在恢复令牌中指定的操作后恢复通知。resumeAfter选项接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken

let inventory = db.collection("inventory")

inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
.flatMap { changeStream in
changeStream.next().map { _ in
changeStream.resumeToken
}.always { _ in
_ = changeStream.kill()
}
}.flatMap { resumeToken in
inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in
newStream.forEach { event in
// process event
print(event)
}
}
}
Swift(Sync)

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.您可以使用resumeAfter选项在恢复令牌中指定的操作后恢复通知。resumeAfter选项接受一个必须解析为恢复令牌的值,例如下面示例中的resumeToken

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()

let resumeToken = changeStream.resumeToken
let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken))
let nextAfterResume = resumedChangeStream.next()

startAfter for Change Streams用于变更流

You can start a new change stream after a specific event by passing a resume token to startAfter when opening the cursor. Unlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream.通过在打开游标时向startAfter传递恢复令牌,可以在特定事件后启动新的更改流。与resumeAfter不同,startAfter可以通过创建新的更改流在无效事件后恢复通知。

See Resume Tokens for more information on the resume token.有关恢复令牌的更多信息,请参阅恢复令牌

Important

  • The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.如果时间戳是过去的,oplog必须有足够的历史记录来定位与令牌或时间戳相关的操作。

Resume Tokens恢复令牌

The resume token is available from multiple sources:恢复令牌可从多个来源获得:

SourceDescription描述
Change Events更改事件Each change event notification includes a resume token on the _id field.每个更改事件通知在_id字段上都包含一个恢复令牌。
Aggregation

The $changeStream aggregation stage includes a resume token on the cursor.postBatchResumeToken field.$changeStream聚合阶段在cursor.postBatchResumeToken字段中包含一个恢复令牌。

This field only appears when using the aggregate command.此字段仅在使用aggregate命令时显示。

Get MoreThe getMore command includes a resume token on the cursor.postBatchResumeToken field.getMore命令在cursor.postBatchResumeToken字段中包含一个恢复令牌。

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果更改流聚合管道修改了事件的_id字段,则更改流将抛出异常。

Tip

MongoDB provides a "snippet", an extension to mongosh, that decodes hex-encoded resume tokens.MongoDB提供了一个“片段”,这是mongosh的扩展,可以解码十六进制编码的恢复令牌。

You can install and run the resumetoken snippet from mongosh:您可以从mongosh安装并运行resumetoken代码段:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

You can also run resumetoken from the command line (without using mongosh) if npm is installed on your system:如果你的系统上安装了npm,你也可以从命令行运resumetoken(不使用mongosh):

npx mongodb-resumetoken-decoder <RESUME TOKEN>

See the following for more details on:有关更多详细信息,请参阅以下内容:

Resume Tokens from Change Events从更改事件中恢复令牌

Change event notifications include a resume token on the _id field:更改事件通知在_id字段中包含一个恢复令牌:

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

Resume Tokens from aggregate来自aggregate的恢复令牌

When using the aggregate command, the $changeStream aggregation stage includes a resume token on the cursor.postBatchResumeToken field:使用aggregate命令时,$changeStream聚合阶段在cursor.postBatchResumeToken字段上包含一个恢复令牌:

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

Resume Tokens from getMoregetMore恢复令牌

The getMore command also includes a resume token on the cursor.postBatchResumeToken field:getMore命令还在cursor.postBatchResumeToken字段中包含一个恢复令牌:

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

Use Cases用例

Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.变更流可以使具有依赖业务系统的架构受益,一旦数据变更持久,就会通知下游系统。例如,变更流可以为开发人员在实现提取、转换和加载(ETL)服务、跨平台同步、协作功能和通知服务时节省时间。

Access Control访问控制

For deployments enforcing Authentication on Self-Managed Deployments and authorization:对于在自我管理部署上强制执行身份验证授权的部署:

  • To open a change stream against specific collection, applications must have privileges that grant changeStream and find actions on the corresponding collection.要针对特定集合打开更改流,应用程序必须具有授予changeStream的权限,并在相应的集合上授予find操作。

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • To open a change stream on a single database, applications must have privileges that grant changeStream and find actions on all non-system collections in the database.要在单个数据库上打开更改流,应用程序必须具有授予changeStream的权限,并在数据库中的所有非系统集合上授予find操作。

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • To open a change stream on an entire deployment, applications must have privileges that grant changeStream and find actions on all non-system collections for all databases in the deployment.要在整个部署上打开更改流,应用程序必须具有授予changeStream的权限,并在部署中的所有数据库的所有非系统集合授予上find操作。

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

Event Notification事件通知

Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.更改流仅通知已持久化到副本集中大多数数据承载成员的数据更改。这确保了通知仅由在失败情况下持久的大多数提交的更改触发。

For example, consider a 3-member replica set with a change stream cursor opened against the primary. 例如,考虑一个由3个成员组成的副本集,其中更改流游标在primary上打开。If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.如果客户端发出插入操作,则只有在插入持久化到大多数数据承载成员时,更改流才会通知应用程序数据更改。

If an operation is associated with a transaction, the change event document includes the txnNumber and the lsid.如果操作与事务相关联,则更改事件文档包括txnNumberlsid

Collation排序规则

Change streams use simple binary comparisons unless an explicit collation is provided.除非提供明确的排序规则,否则更改流使用simple二进制比较。

Change Streams and Orphan Documents更改流和孤立文档

Starting in MongoDB 5.3, during range migration, change stream events are not generated for updates to orphaned documents.从MongoDB 5.3开始,在范围迁移期间,不会为孤立文档的更新生成更改流事件。

Change Streams with Document Pre- and Post-Images使用文档前映像和后映像更改流

Starting in MongoDB 6.0, you can use change stream events to output the version of a document before and after changes (the document pre- and post-images):从MongoDB 6.0开始,您可以使用更改流事件输出更改前后的文档版本(文档前映像和后映像):

  • The pre-image is the document before it was replaced, updated, or deleted. There is no pre-image for an inserted document.前映像是替换、更新或删除之前的文档。插入的文档没有前映像。
  • The post-image is the document after it was inserted, replaced, or updated. There is no post-image for a deleted document.后映像是插入、替换或更新后的文档。已删除文档没有后映像。
  • Enable changeStreamPreAndPostImages for a collection using db.createCollection(), create, or collMod.使用db.createCollection()createcollMod为集合启用changeStreamPreAndPostImages

Pre- and post-images are not available for a change stream event if the images were:如果映像是以下情况,则前映像和后映像对更改流事件不可用:

  • Not enabled on the collection at the time of a document update or delete operation.在文档更新或删除操作时,集合上未启用。
  • Removed after the pre- and post-image retention time set in expireAfterSeconds.expireAfterSeconds中设置的映像保留时间前后删除。

    • The following example sets expireAfterSeconds to 100 seconds on an entire cluster:以下示例将整个集群上的expireAfterSeconds设置为100秒:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 }
      } }
      } )
    • The following example returns the current changeStreamOptions settings, including expireAfterSeconds:以下示例返回当前changeStreamOptions设置,包括expireAfterSeconds

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • Setting expireAfterSeconds to off uses the default retention policy: pre- and post-images are retained until the corresponding change stream events are removed from the oplog.expireAfterSeconds设置为off(关闭)将使用默认保留策略:保留前后映像,直到从oplog中删除相应的更改流事件。
    • If a change stream event is removed from the oplog, then the corresponding pre- and post-images are also deleted regardless of the expireAfterSeconds pre- and post-image retention time.如果从oplog中删除了更改流事件,则相应的前映像和后映像也将被删除,而不管映像保留时间为expireAfterSeconds之前和之后。

Additional considerations:其他注意事项:

  • Enabling pre- and post-images consumes storage space and adds processing time. Only enable pre- and post-images if you need them.启用前映像和后映像会消耗存储空间并增加处理时间。仅在需要时启用前映像和后映像。
  • Limit the change stream event size to less than 16 mebibytes. To limit the event size, you can:将更改流事件大小限制为小于16兆字节。要限制事件大小,您可以:

    • Limit the document size to 8 megabytes. You can request pre- and post-images simultaneously in the change stream output if other change stream event fields like updateDescription are not large.将文档大小限制为8 MB。如果其他更改流事件字段(如updateDescription)不大,您可以在更改流输出中同时请求前映像和后映像。
    • Request only post-images in the change stream output for documents up to 16 mebibytes if other change stream event fields like updateDescription are not large.如果其他更改流事件字段(如updateDescription)不大,则仅请求更改流输出中最多16兆字节的文档的后映像。
    • Request only pre-images in the change stream output for documents up to 16 mebibytes if:在以下情况下,仅在更改流输出中请求最多16兆字节的文档的前映像:

      • document updates affect only a small fraction of the document structure or content, and文档更新仅影响文档结构或内容的一小部分,以及
      • do not cause a replace change event. A replace event always includes the post-image.不要引起replace事件。replace事件总是包括后映像。
  • To request a pre-image, you set fullDocumentBeforeChange to required or whenAvailable in db.collection.watch(). To request a post-image, you set fullDocument using the same method.要请求前映像,您可以在db.collection.watch()中将fullDocumentBeforeChange设置为required(必需)或whenAvailable(在可用时)。要请求后映像,您可以使用相同的方法设置fullDocument
  • Pre-images are written to the config.system.preimages collection.预映像会写入config.system.preimages集合。

    • The config.system.preimages collection may become large. To limit the collection size, you can set expireAfterSeconds time for the pre-images as shown earlier.config.system.preimages集合可能会变大。要限制集合大小,您可以为预映像设置expireAfterSeconds时间,如前所示。
    • Pre-images are removed asynchronously by a background process.前映像由后台进程异步删除。

Important

Backward-Incompatible Feature向后不兼容功能

Starting in MongoDB 6.0, if you are using document pre- and post-images for change streams, you must disable changeStreamPreAndPostImages for each collection using the collMod command before you can downgrade to an earlier MongoDB version.从MongoDB 6.0开始,如果您将文档预映像和后映像用于更改流,则必须使用collMod命令为每个集合禁用changeStreamPreAndPostImages,然后才能降级到早期的MongoDB版本。

Tip

For complete examples with the change stream output, see Change Streams with Document Pre- and Post-Images.有关更改流输出的完整示例,请参阅使用文档前后映像更改流