Change Streams更改流

选择语言

On this page本页内容

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. 更改流允许应用程序访问实时数据更改,而无需跟踪oplog的复杂性和风险。Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.应用程序可以使用更改流订阅单个集合、数据库或整个部署上的所有数据更改,并立即对其作出响应。因为变更流使用聚合框架,所以应用程序还可以筛选特定的变更或随意转换通知。

Starting in MongoDB 5.1, change streams are optimized, providing more efficient resource utilization and faster execution of some aggregation pipeline stages.从MongoDB 5.1开始,变更流得到了优化,提供了更高效的资源利用率和更快的一些聚合管道阶段的执行。

Availability可用性

Change streams are available for replica sets and sharded clusters:更改流可用于副本集分片群集

  • Storage Engine.存储引擎。

    The replica sets and sharded clusters must use the WiredTiger storage engine. 副本集和分片群集必须使用WiredTiger存储引擎。Change streams can also be used on deployments that employ MongoDB's encryption-at-rest feature.变更流也可以用于采用MongoDB静态加密功能的部署。

  • Replica Set Protocol Version.副本集协议版本。

    The replica sets and sharded clusters must use replica set protocol version 1 (pv1).副本集和分片群集必须使用副本集协议版本1(pv1)。

  • Read Concern "majority" Enablement.读取关注“多数”启用。

    Starting in MongoDB 4.2, change streams are available regardless of the "majority" read concern support; that is, read concern majority support can be either enabled (default) or disabled to use change streams.从MongoDB 4.2开始,无论"majority"读关注点支持如何,都可以使用更改流;也就是说,可以启用(默认)或禁用读关注majority持来使用更改流。

    In MongoDB 4.0 and earlier, change streams are available only if "majority" read concern support is enabled (default).在MongoDB 4.0及更早版本中,只有启用了“多数”读关注点支持(默认),更改流才可用。

Connect连接

Connections for a change stream can either use DNS seed lists with the +srv connection option or by listing the servers individually in the connection string.更改流的连接可以使用带有+srv连接选项的DNS种子列表,也可以在连接字符串中单独列出服务器。

If the driver loses the connection to a change stream or the connection goes down, it attempts to reestablish a connection to the change stream through another node in the cluster that has a matching read preference. 如果驱动程序失去与更改流的连接或连接中断,它会尝试通过集群中具有匹配读取首选项的另一个节点重新建立与更改流之间的连接。If the driver cannot find a node with the correct read preference, it throws an exception.如果驱动程序找不到具有正确读取首选项的节点,则会引发异常。

For more information, see Connection String URI Format.有关更多信息,请参阅连接字符串URI格式

Watch a Collection, Database, or Deployment监视集合、数据库或部署

You can open change streams against:您可以针对以下内容打开变更流:

Target目标Description描述
A collection某个集合

You can open a change stream cursor for a single collection (except system collections, or any collections in the admin, local, and config databases).您可以为单个集合打开变更流游标(system集合或adminlocalconfig数据库中的任何集合除外)。

The examples on this page use the MongoDB drivers to open and work with a change stream cursor for a single collection. 本页上的示例使用MongoDB驱动程序为单个集合打开并使用变更流游标。See also the mongosh method db.collection.watch().另请参见mongosh方法db.collection.watch()

A database某个数据库

Starting in MongoDB 4.0, you can open a change stream cursor for a single database (excluding admin, local, and config database) to watch for changes to all its non-system collections.从MongoDB4.0开始,您可以为单个数据库(不包括adminlocalconfig数据库)打开更改流游标,以监视其所有非系统集合的更改。

For the MongoDB driver method, refer to your driver documentation. 有关MongoDB驱动程序方法,请参阅驱动程序文档。See also the mongosh method db.watch().另请参见mongosh方法db.watch()

A deployment某次部署

Starting in MongoDB 4.0, you can open a change stream cursor for a deployment (either a replica set or a sharded cluster) to watch for changes to all non-system collections across all databases except for admin, local, and config.从MongoDB 4.0开始,您可以为部署(副本集或分片集群)打开更改流游标,以监视除adminlocalconfig之外的所有数据库中所有非系统集合的更改。

For the MongoDB driver method, refer to your driver documentation. 有关MongoDB驱动程序方法,请参阅驱动程序文档。See also the mongosh method Mongo.watch().另请参见mongosh方法Mongo.watch()

Note注意

Change Stream Examples更改流示例

The examples on this page use the MongoDB drivers to illustrate how to open a change stream cursor for a collection and work with the change stream cursor.本页上的示例使用MongoDB驱动程序来说明如何为集合打开变更流游标并使用变更流游游标。

Open A Change Stream打开更改流

To open a change stream:要打开更改流,请执行以下操作:

  • For a replica set, you can issue the open change stream operation from any of the data-bearing members.对于副本集,可以从任何数据承载成员发出开放更改流操作。

  • For a sharded cluster, you must issue the open change stream operation from the mongos.对于分片集群,必须从mongos发出开放更改流操作。

The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. 以下示例打开集合的更改流,并在游标上迭代以检索更改流文档。[1]


Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。


The C examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

mongoc_collection_t *collection;
bson_t *pipeline = bson_new ();
bson_t opts = BSON_INITIALIZER;
mongoc_change_stream_t *stream;
const bson_t *change;
const bson_t *resume_token;
bson_error_t error;
collection = mongoc_database_get_collection (db, "inventory"); stream = mongoc_collection_watch (collection, pipeline, NULL /* opts */); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); }
mongoc_change_stream_destroy (stream);

The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

var cursor = inventory.Watch();
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

The Go examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

cs, err := coll.Watch(ctx, mongo.Pipeline{})
require.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx) next := cs.Current

The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的Java示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库。

MongoCursor<ChangeStreamDocument<Document>> cursor = inventory.watch().iterator();
ChangeStreamDocument<Document> next = cursor.next();

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

cursor = db.inventory.watch()
document = await cursor.next()

The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.下面的Node.js示例假设您已连接到MongoDB副本集,并访问了包含inventory集合的数据库。

The following example uses stream to process the change events.以下示例使用流处理更改事件。

const collection = db.collection('inventory');
const changeStream = collection.watch();
changeStream.on('change', next => {
  // process next document
});

Alternatively, you can also use iterator to process the change events:或者,也可以使用迭代器处理更改事件:

const changeStreamIterator = collection.watch();
const next = await changeStreamIterator.next();

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

$changeStream = $db->inventory->watch();
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

cursor = db.inventory.watch()
document = next(cursor)

The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

cursor = inventory.watch.to_enum
next_change = cursor.next

The Swift (Async) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

let inventory = db.collection("inventory")
// Option 1: retrieve next document via next() let next = inventory.watch().flatMap { cursor in cursor.next() }
// Option 2: register a callback to execute for each document let result = inventory.watch().flatMap { cursor in cursor.forEach { event in // process event print(event) } }

The Swift (Sync) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory collection.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch()
let next = changeStream.next()

To retrieve the data change event from the cursor, iterate the change stream cursor. 要从游标检索数据更改事件,请迭代更改流游标。For information on the change stream event, see Change Events.有关更改流事件的信息,请参阅更改事件

While the connection to the MongoDB deployment remains open, the cursor remains open until one of the following occurs:当与MongoDB部署的连接保持打开时,游标将保持打开状态,直到出现以下情况之一:

  • The cursor is explicitly closed.游标已明确关闭。

  • An invalidate event occurs.发生无效事件

  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.如果部署是分片集群,那么分片移除可能会导致打开的变更流游标关闭,并且关闭的变更流游标可能无法完全恢复。

Note注意

The lifecycle of an unclosed cursor is language-dependent.未关闭游标的生命周期取决于语言。

[1] Starting in MongoDB 4.0, you can specify a startAtOperationTime to open the cursor at a particular point in time. 从MongoDB 4.0开始,可以指定startAtOperationTime在特定时间点打开游标。If the specified starting point is in the past, it must be in the time range of the oplog.如果指定的起点在过去,则它必须在oplog的时间范围内。

Modify Change Stream Output修改更改流输出


Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。


You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

pipeline = BCON_NEW ("pipeline",
                     "[",
                     "{",
                     "$match",
                     "{",
                     "fullDocument.username",
                     BCON_UTF8 ("alice"),
                     "}",
                     "}",
                     "{",
                     "$addFields",
                     "{",
                     "newField",
                     BCON_UTF8 ("this is an added field!"),
                     "}",
                     "}",
                     "]");
stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); }
mongoc_change_stream_destroy (stream);

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
    .Match(change =>
        change.FullDocument["username"] == "alice" ||
        change.OperationType == ChangeStreamOperationType.Delete)
    .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
        "{ $addFields : { newField : 'this is an added field!' } }");
var collection = database.GetCollection<BsonDocument>("inventory"); using (var cursor = collection.Watch(pipeline)) { while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch var next = cursor.Current.First(); }

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
	bson.A{
		bson.D{{"fullDocument.username", "alice"}},
		bson.D{{"operationType", "delete"}}}}},
}}}
cs, err := coll.Watch(ctx, pipeline)
require.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx) next := cs.Current

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

MongoClient mongoClient = new MongoClient( new MongoClientURI("mongodb://host1:port1,host2:port2..."));
// Select the MongoDB database and collection to open the change stream against
MongoDatabase db = mongoClient.getDatabase("myTargetDatabase");
MongoCollection<Document> collection = db.getCollection("myTargetCollection");
// Create $match pipeline stage. List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.username': 'alice'}"), Filters.in("operationType", asList("delete")))));
// Create the change stream cursor, passing the pipeline to the // collection.watch() method
MongoCursor<Document> cursor = collection.watch(pipeline).iterator();

The pipeline list includes a single $match stage that filters any operations where the username is alice, or operations where the operationType is delete.

Passing the pipeline to the watch() method directs the change stream to return notifications after passing them through the specified pipeline.

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

pipeline = [
    {"$match": {"fullDocument.username": "alice"}},
    {"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = await cursor.next()

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:配置更改流时,可以通过提供以下一个或多个管道阶段的数组来控制更改流输出:

The following example uses stream to process the change events.以下示例使用流处理更改事件。

const pipeline = [
  { $match: { 'fullDocument.username': 'alice' } },
  { $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); changeStream.on('change', next => { // process next document });

Alternatively, you can also use iterator to process the change events:或者,也可以使用迭代器处理更改事件:

const changeStreamIterator = collection.watch(pipeline);
const next = await changeStreamIterator.next();

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

$pipeline = [
    ['$match' => ['fullDocument.username' => 'alice']],
    ['$addFields' => ['newField' => 'this is an added field!']],
];
$changeStream = $db->inventory->watch($pipeline);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

pipeline = [
    {"$match": {"fullDocument.username": "alice"}},
    {"$addFields": {"newField": "this is an added field!"}},
]
cursor = db.inventory.watch(pipeline=pipeline)
document = next(cursor)

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

let pipeline: [BSONDocument] = [
    ["$match": ["fullDocument.username": "alice"]],
    ["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
// Option 1: use next() to iterate let next = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.next() }
// Option 2: register a callback to execute for each document let result = inventory.watch(pipeline, withEventType: BSONDocument.self).flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }

You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:

let pipeline: [BSONDocument] = [
    ["$match": ["fullDocument.username": "alice"]],
    ["$addFields": ["newField": "this is an added field!"]]
]
let inventory = db.collection("inventory")
let changeStream = try inventory.watch(pipeline, withEventType: BSONDocument.self)
let next = changeStream.next()
Tip提示

The _id field of the change stream event document act as the resume token. 变更流事件文档的_id字段用作恢复口令Do not use the pipeline to modify or remove the change stream event's _id field.不要使用管道修改或删除更改流事件的_id字段。

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果变更流聚合管道修改了事件的_id字段,变更流将引发异常。

See Change Events for more information on the change stream response document format.有关更改流响应文档格式的更多信息,请参阅更改事件

Lookup Full Document for Update Operations查找完整文档以进行更新操作

By default, change streams only return the delta of fields during the update operation. However, you can configure the change stream to return the most current majority-committed version of the updated document.默认情况下,在更新操作期间,更改流仅返回字段的增量。但是,您可以配置更改流以返回更新文档的最新多数提交版本。


Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。


To return the most current majority-committed version of the updated document, pass the "fullDocument" option with the "updateLookup" value to the mongoc_collection_watch method.要返回更新文档的最新多数提交版本,请将带有"updateLookup"值的"fullDocument"选项传递给mongoc_collection_watch方法。

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个fullDocument字段,该字段表示受更新操作影响的文档的当前版本。

BSON_APPEND_UTF8 (&opts, "fullDocument", "updateLookup");
stream = mongoc_collection_watch (collection, pipeline, &opts);
mongoc_change_stream_next (stream, &change);
if (mongoc_change_stream_error_document (stream, &error, NULL)) {
   MONGOC_ERROR ("%s\n", error.message);
}
mongoc_change_stream_destroy (stream);

To return the most current majority-committed version of the updated document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup" to the db.collection.watch() method.

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var cursor = inventory.Watch(options);
while (cursor.MoveNext() && cursor.Current.Count() == 0) { } // keep calling MoveNext until we've read the first batch
var next = cursor.Current.First();
cursor.Dispose();

To return the most current majority-committed version of the updated document, SetFullDocument(options.UpdateLookup) change stream option.要返回更新文档的最新多数提交版本,SetFullDocument(options.UpdateLookup)更改流选项。

cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
require.NoError(t, err)
defer cs.Close(ctx)
ok := cs.Next(ctx) next := cs.Current

To return the most current majority-committed version of the updated document, pass FullDocument.UPDATE_LOOKUP to the db.collection.watch.fullDocument() method.

In the example below, all update operations notifications include a FullDocument field that represents the current version of the document affected by the update operation.

cursor = inventory.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
next = cursor.next();

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

In the example below, all update operations notifications include a `full_document field that represents the current version of the document affected by the update operation.

cursor = db.inventory.watch(full_document="updateLookup")
document = await cursor.next()

To return the most current majority-committed version of the updated document, pass { fullDocument: 'updateLookup' } to the db.collection.watch() method.要返回更新文档的最新多数提交版本,请将{ fullDocument: 'updateLookup' }传递给db.collection.watch()方法。

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个fullDocument字段,该字段表示受更新操作影响的文档的当前版本。

The following example uses stream to process the change events.以下示例使用流处理更改事件。

const collection = db.collection('inventory');
const changeStream = collection.watch([], { fullDocument: 'updateLookup' });
changeStream.on('change', next => {
  // process next document
});

Alternatively, you can also use iterator to process the change events:或者,也可以使用迭代器处理更改事件:

const changeStreamIterator = collection.watch([], { fullDocument: 'updateLookup' });
const next = await changeStreamIterator.next();

To return the most current majority-committed version of the updated document, pass "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP" to the db.watch() method.

In the example below, all update operations notifications include a fullDocument field that represents the current version of the document affected by the update operation.

$changeStream = $db->inventory->watch([], ['fullDocument' => \MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]);
$changeStream->rewind();
$firstChange = $changeStream->current();
$changeStream->next();
$secondChange = $changeStream->current();

To return the most current majority-committed version of the updated document, pass full_document='updateLookup' to the db.collection.watch() method.

In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.在下面的示例中,所有更新操作通知都包含一个full_document字段,该字段表示受更新操作影响的文档的当前版本。

cursor = db.inventory.watch(full_document="updateLookup")
document = next(cursor)

To return the most current majority-committed version of the updated document, pass full_document: 'updateLookup' to the db.watch() method.

In the example below, all update operations notifications include a full_document field that represents the current version of the document affected by the update operation.

cursor = inventory.watch([], full_document: 'updateLookup').to_enum
next_change = cursor.next

To return the most current majority-committed version of the updated document, pass options: ChangeStreamOptions(fullDocument: .updateLookup) to the watch() method.要返回更新文档的最新多数提交版本,请将选项options: ChangeStreamOptions(fullDocument: .updateLookup)传递给watch()方法。

let inventory = db.collection("inventory")
// Option 1: use next() to iterate let next = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next() }
// Option 2: register a callback to execute for each document let result = inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.forEach { event in // process event print(event) } }

To return the most current majority-committed version of the updated document, pass options: ChangeStreamOptions(fullDocument: .updateLookup) to the watch() method.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
Note注意

If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.如果有一个或多个大多数提交的操作在更新操作之后但在查找之前修改了更新的文档,则返回的完整文档可能与更新操作时的文档有显著差异。

However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.然而,变更流文档中包含的增量始终正确描述了应用于该变更流事件的关注集合更改。

See Change Events for more information on the change stream response document format.有关更改流响应文档格式的更多信息,请参阅更改事件

Resume a Change Stream恢复更改流

Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.通过在打开游标时将恢复标记指定为resumeAfterstartAfter,可以恢复更改流。

resumeAfter for Change Streams用于更改流

You can resume a change stream after a specific event by passing a resume token to resumeAfter when opening the cursor. 您可以在特定事件后恢复更改流,方法是在打开游标时将恢复标记传递给resumeAfterFor the resume token, use the _id value of the change stream event document. 对于恢复令牌,请使用变更流事件文档_id值。See Resume Tokens for more information on the resume token.有关恢复令牌的详细信息,请参阅恢复令牌

Important重要
  • The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.如果时间戳是过去的,oplog必须有足够的历史记录来定位与令牌或时间戳关联的操作。

  • You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. 无效事件(例如,集合删除或重命名)关闭更改流后,不能使用resumeAfter恢复更改流。Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.从MongoDB 4.2开始,您可以使用startAfter在失效事件后启动新的更改流。

In the example below, the resumeAfter option is appended to the stream options to recreate the stream after it has been destroyed. Passing the _id to the change stream attempts to resume notifications starting after the operation specified.

stream = mongoc_collection_watch (collection, pipeline, NULL);
if (mongoc_change_stream_next (stream, &change)) {
   resume_token = mongoc_change_stream_get_resume_token (stream);
   BSON_APPEND_DOCUMENT (&opts, "resumeAfter", resume_token);
mongoc_change_stream_destroy (stream); stream = mongoc_collection_watch (collection, pipeline, &opts); mongoc_change_stream_next (stream, &change); mongoc_change_stream_destroy (stream); } else { if (mongoc_change_stream_error_document (stream, &error, NULL)) { MONGOC_ERROR ("%s\n", error.message); }
mongoc_change_stream_destroy (stream); }

In the example below, the resumeToken is retrieved from the last change stream document and passed to the Watch() method as an option. Passing the resumeToken to the Watch() method directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.

  var resumeToken = previousCursor.GetResumeToken();
  var options = new ChangeStreamOptions { ResumeAfter = resumeToken };
  var cursor = inventory.Watch(options);
  cursor.MoveNext();
  var next = cursor.Current.First();
  cursor.Dispose();

You can use ChangeStreamOptions.SetResumeAfter to specify the resume token for the change stream. If the resumeAfter option is set, the change stream resumes notifications after the operation specified in the resume token. The SetResumeAfter takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

resumeToken := original.ResumeToken()
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken)) require.NoError(t, err) defer cs.Close(ctx)
ok = cs.Next(ctx) result := cs.Current

You can use the resumeAfter() method to resume notifications after the operation specified in the resume token. The resumeAfter() method takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

BsonDocument resumeToken = next.getResumeToken();
cursor = inventory.watch().resumeAfter(resumeToken).iterator();
next = cursor.next();

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = await cursor.next()

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. 您可以使用resumeAfter选项在恢复令牌中指定的操作之后恢复通知。The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.resumeAfter选项接受一个必须解析为恢复标记的值,例如下面示例中的resumeToken。

const collection = db.collection('inventory');
const changeStream = collection.watch();
let newChangeStream; changeStream.once('change', next => { const resumeToken = changeStream.resumeToken; changeStream.close();
newChangeStream = collection.watch([], { resumeAfter: resumeToken }); newChangeStream.on('change', next => { processChange(next); }); });

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. 您可以使用resumeAfter选项在恢复令牌中指定的操作之后恢复通知。The resumeAfter option takes a value that must resolve to a resume token, e.g. $resumeToken in the example below.resumeAfter选项接受一个必须解析为恢复标记的值,例如下面示例中的$resumeToken

$resumeToken = $changeStream->getResumeToken();
if ($resumeToken === null) { throw new \Exception('Resume token was not found'); }
$changeStream = $db->inventory->watch([], ['resumeAfter' => $resumeToken]); $changeStream->rewind();
$firstChange = $changeStream->current();

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)

You can use the resume_after modifier to resume notifications after the operation specified in the resume token. The resume_after modifier takes a value that must resolve to a resume token, e.g. resume_token in the example below.

  change_stream = inventory.watch
  cursor = change_stream.to_enum
  next_change = cursor.next
  resume_token = change_stream.resume_token
new_cursor = inventory.watch([], resume_after: resume_token).to_enum resumed_change = new_cursor.next

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

let inventory = db.collection("inventory")
inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup)) .flatMap { changeStream in changeStream.next().map { _ in changeStream.resumeToken }.always { _ in _ = changeStream.kill() } }.flatMap { resumeToken in inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)).flatMap { newStream in newStream.forEach { event in // process event print(event) } } }

You can use the resumeAfter option to resume notifications after the operation specified in the resume token. The resumeAfter option takes a value that must resolve to a resume token, e.g. resumeToken in the example below.

let inventory = db.collection("inventory")
let changeStream = try inventory.watch(options: ChangeStreamOptions(fullDocument: .updateLookup))
let next = changeStream.next()
let resumeToken = changeStream.resumeToken let resumedChangeStream = try inventory.watch(options: ChangeStreamOptions(resumeAfter: resumeToken)) let nextAfterResume = resumedChangeStream.next()

startAfter for Change Streams用于更改流

New in version 4.2.

You can start a new change stream after a specific event by passing a resume token to startAfter when opening the cursor. 您可以在特定事件后启动新的更改流,方法是在打开游标时将恢复标记传递给startAfterUnlike resumeAfter, startAfter can resume notifications after an invalidate event by creating a new change stream. resumeAfter不同,startAfter可以通过创建新的更改流在失效事件后恢复通知。For the resume token, use the _id value of the change stream event document. 对于恢复令牌,请使用变更流事件文档_id值。See Resume Tokens for more information on the resume token.有关恢复令牌的详细信息,请参阅恢复令牌

Important重要
  • The oplog must have enough history to locate the operation associated with the token or the timestamp, if the timestamp is in the past.如果时间戳是过去的,oplog必须有足够的历史记录来定位与令牌或时间戳关联的操作。

Resume Tokens恢复令牌

The _id value of a change stream event document acts as the resume token:更改流事件文档_id值用作恢复标记:

{
   "_data" : <BinData|hex string>
}
Tip提示

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event's _id field.从MongoDB 4.2开始,如果变更流聚合管道修改了事件的_id字段,变更流将引发异常。

The resume token _data type depends on the MongoDB versions and, in some cases, the feature compatibility version (fcv) at the time of the change stream's opening/resumption (i.e. a change in fcv value does not affect the resume tokens for already opened change streams):恢复令牌_data类型取决于MongoDB版本,在某些情况下,还取决于更改流打开/恢复时的功能兼容性版本(fcv)(即fcv值的更改不会影响已打开的更改流的恢复令牌):

MongoDB VersionFeature Compatibility Version功能兼容性版本Resume Token _data Type
MongoDB 4.2 and later"4.2" or "4.0"Hex-encoded string (v1)
MongoDB 4.0.7 and later"4.0" or "3.6"Hex-encoded string (v1)
MongoDB 4.0.6 and earlier"4.0"Hex-encoded string (v0)
MongoDB 4.0.6 and earlier"3.6"BinData
MongoDB 3.6"3.6"BinData

With hex-encoded string resume tokens, you can compare and sort the resume tokens.使用十六进制编码的字符串恢复标记,可以对恢复标记进行比较和排序。

Regardless of the fcv value, a 4.0 deployment can use either BinData resume tokens or hex string resume tokens to resume a change stream. 无论fcv值是多少,4.0部署都可以使用BinData恢复标记或十六进制字符串恢复标记来恢复更改流。As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.因此,4.0部署可以使用3.6部署集合上打开的变更流中的恢复令牌。

New resume token formats introduced in a MongoDB version cannot be consumed by earlier MongoDB versions.MongoDB版本中引入的新恢复令牌格式不能被早期的MongoDB使用。

Tip提示

MongoDB provides a "snippet", an extension to mongosh, that decodes hex-encoded resume tokens.MongoDB提供了一个"snippet",它是mongosh的扩展,用于解码十六进制编码的恢复令牌。

You can install and run the resumetoken snippet from mongosh:您可以从mongosh安装并运行resumetoken代码段:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

You can also run resumetoken from the command line (without using mongosh) if npm is installed on your system:如果系统上安装了npm,也可以从命令行运行resumetoken(不使用mongosh):

npx mongodb-resumetoken-decoder <RESUME TOKEN>

See the following for more details on:有关的详细信息,请参阅以下内容:

Use Cases使用案例

Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. 变更流可以使具有可靠业务系统的架构受益,一旦数据变更持久,就会通知下游系统。For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.例如,更改流可以为开发人员在实现提取、转换和加载(ETL)服务、跨平台同步、协作功能和通知服务时节省时间。

Access Control访问控制

For deployments enforcing Authentication and authorization:对于实施身份验证授权的部署:

  • To open a change stream against specific collection, applications must have privileges that grant changeStream and find actions on the corresponding collection.要针对特定集合打开更改流,应用程序必须具有授予changeStream并在相应集合上find操作的权限。

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • To open a change stream on a single databases, applications must have privileges that grant changeStream and find actions on all non-system collections in a database.要在单个数据库上打开变更流,应用程序必须具有授予changeStream的权限,以及在数据库中的所有非system集合上find操作的权限。

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • To open a change stream on an entire deployment, applications must have privileges that grant changeStream and find actions on all non-system collections for all databases in the deployment.要在整个部署上打开更改流,应用程序必须具有授予changeStream的权限,以及在部署中所有数据库的所有非system集合上find操作的权限。

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

Event Notification事件通知

Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. 更改流仅通知持久化到副本集中大多数数据承载成员的数据更改。This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.这确保了通知仅由大多数提交的更改触发,这些更改在失败场景中是持久的。

For example, consider a 3-member replica set with a change stream cursor opened against the primary. 例如,考虑一个3成员副本集,其中针对primary打开了一个变更流游标。If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.如果客户端发出插入操作,则更改流仅在插入持久化到大多数数据承载成员后通知应用程序数据更改。

If an operation is associated with a transaction, the change event document includes the txnNumber and the lsid.如果操作与事务关联,则更改事件文档包括txnNumberlsid

Collation排序规则

Starting in MongoDB 4.2, change streams use simple binary comparisons unless an explicit collation is provided. 从MongoDB 4.2开始,更改流使用simple二进制比较,除非提供了明确的排序规则。In earlier versions, change streams opened on a single collection (db.collection.watch()) would inherit that collection's default collation.在早期版本中,在单个集合(db.collection.watch())上打开的更改流将继承该集合的默认排序规则。

Change Streams and Orphan Documents更改流和孤立文档

Starting in MongoDB 5.3, during chunk migration, change stream events are not generated for updates to orphaned documents.从MongoDB 5.3开始,在块迁移期间,不会为孤立文档的更新生成更改流事件。

←  Appendix C - OpenSSL Client Certificates for TestingChange Streams Production Recommendations →