Docs HomeNode.js

Watch for Changes监视更改

Open a Change Stream打开变更流

You can watch for changes in MongoDB using the watch() method on the following objects:您可以使用watch()方法对以下对象监视MongoDB中的更改:

For each object, the watch() method opens a change stream to emit change event documents when they occur.对于每个对象,watch()方法打开一个变更流,以便在发生变更时发出变更事件文档。

The watch() method optionally takes an aggregation pipeline which consists of an array of aggregation stages as the first parameter. watch()方法可选地将一个由聚合阶段数组组成的聚合管道作为第一个参数。The aggregation stages filter and transform the change events.聚合阶段筛选和转换更改事件。

In the following snippet, the $match stage matches all change event documents with a runtime value of less than 15, filtering all others out.在下面的代码段中,$match阶段匹配runtime值小于15的所有更改事件文档,筛选掉所有其他文档。

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

The watch() method accepts an options object as the second parameter. watch()方法接受一个options对象作为第二个参数。Refer to the links at the end of this section for more information on the settings you can configure with this object.有关可以使用此对象配置的设置的详细信息,请参阅本节末尾的链接。

The watch() method returns an instance of a ChangeStream. watch()方法返回ChangeStream的一个实例。You can read events from change streams by iterating over them or listening for events.您可以通过迭代或侦听事件来读取变更流中的事件。

Select the tab that corresponds to the way you want to read events from the change stream:选择与您希望从更改流中读取事件的方式相对应的选项卡:

Starting in version 4.12, ChangeStream objects are async iterables. With this change, you can use for-await loops to retrieve events from an open change stream:从4.12版本开始,ChangeStream对象是异步可迭代对象。通过此更改,您可以使用for-await循环从打开的更改流中检索事件:

for await (const change of changeStream) {
console.log("Received change: ", change);
}

You can call methods on the ChangeStream object such as:您可以调用ChangeStream对象上的方法,例如:

  • hasNext() to check for remaining documents in the stream检查流中的剩余文档

  • next() to request the next document in the stream请求流中的下一个文档

  • close() to close the ChangeStream关闭变更流

You can attach listener functions to the ChangeStream object by calling the on() method. 您可以通过调用on()方法将侦听器函数附加到ChangeStream对象。This method is inherited from the Javascript EventEmitter class. 此方法继承自Javascript EventEmitter类。Pass the string "change" as the first parameter and your listener function as the second parameter as shown below:将字符串"change"作为第一个参数,将侦听器函数作为第二个参数,如下所示:

changeStream.on("change", (changeEvent) => { /* your listener function你的监听器函数 */ });

The listener function triggers when a change event is emitted. 侦听器函数在发出change事件时触发。You can specify logic in the listener to process the change event document when it is received.您可以在侦听器中指定逻辑,以便在收到更改事件文档时对其进行处理。

You can control the change stream by calling pause() to stop emitting events or resume() to continue to emit events.您可以通过调用pause()停止发出事件或调用resume()继续发出事件来控制更改流。

To stop processing change events, call the close() method on the ChangeStream instance. 要停止处理更改事件,请在ChangeStream实例上调用close()方法。This closes the change stream and frees resources.这将关闭更改流并释放资源。

changeStream.close();
Warning

Using a ChangeStream in EventEmitter and Iterator mode concurrently is not supported by the driver and causes an error. 驱动程序不支持在EventEmitterIterator模式下同时使用ChangeStream,这会导致错误。This is to prevent undefined behavior, where the driver cannot guarantee which consumer receives documents first.这是为了防止未定义的行为,即驱动程序无法保证哪个消费者首先接收文档。

Examples实例

Iteration

The following example opens a change stream on the haikus collection in the insertDB database and prints change events as they occur:以下示例在insertDB数据库中打开haikus集合上的更改流,并在更改事件发生时打印它们:

Note

You can use this example to connect to an instance of MongoDB and interact with a database that contains sample data. 您可以使用此示例连接到MongoDB的实例,并与包含示例数据的数据库进行交互。To learn more about connecting to your MongoDB instance and loading a sample dataset, see the Usage Examples guide.要了解有关连接到MongoDB实例和加载示例数据集的更多信息,请参阅用法实例指南

import { MongoClient } from "mongodb";

// Replace the uri string with your MongoDB deployment's connection string.将uri字符串替换为MongoDB部署的连接字符串。
const uri = "<connection string uri>";

const client = new MongoClient(uri);

let changeStream;
async function run() {
try {
const database = client.db("insertDB");
const haikus = database.collection("haikus");

// Open a Change Stream on the "haikus" collection打开“haikus(俳句)”集合的变更流
changeStream = haikus.watch();

// Print change events打印更改事件
for await (const change of changeStream) {
console.log("Received change:\n", change);
}

await changeStream.close();

} finally {
await client.close();
}
}
run().catch(console.dir);
Note

Identical Code Snippets相同的代码段

The JavaScript and TypeScript code snippets above are identical. 上面的JavaScript和TypeScript代码片段是相同的。There are no TypeScript specific features of the driver relevant to this use case.驱动程序没有与此用例相关的TypeScript特定功能。

When you run this code and then make a change to the haikus collection, such as performing an insert or delete operation, you can see the change event document printed in your terminal.当您运行此代码,然后对haikus集合进行更改(例如执行插入或删除操作)时,您可以看到打印在终端中的更改事件文档。

For example, if you insert a document to the collection, the code prints the following output:例如,如果将文档插入集合,则代码将打印以下输出:

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}
Note

Receive Full Documents From Updates接收来自更新的完整文档

Change events that contain information on update operations only return the modified fields by default rather than the full updated document. 默认情况下,包含更新操作信息的更改事件只返回修改后的字段,而不是完整更新的文档。You can configure your change stream to also return the most current version of the document by setting the fullDocument field of the options object to "updateLookup" as follows:通过将选项对象的fullDocument字段设置为"updateLookup",您可以将更改流配置为也返回文档的最新版本,如下所示:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.这可以是任何管道。
const pipeline = [];

const changeStream = myColl.watch(pipeline, options);

Listener Function侦听器函数

The following example opens a change stream on the haikus collection in the insertDB database. 以下示例在insertDB数据库中打开一个关于haikus集合的更改流。Let's create a listener function to receive and print change events that occur on the collection.让我们创建一个侦听器函数来接收和打印集合上发生的更改事件。

First, open the change stream on the collection and then define a listener on the change stream using the on() method. 首先,打开集合上的变更流,然后使用on()方法在变更流上定义一个侦听器。Once you set the listener, generate a change event by performing a change to the collection.设置侦听器后,通过对集合执行更改来生成更改事件。

To generate the change event on the collection, let's use the insertOne() method to add a new document. 要在集合上生成更改事件,让我们使用insertOne()方法添加一个新文档。Since insertOne() may run before the listener function can register, we use a timer, defined as simulateAsyncPause to wait 1 second before executing the insert.由于insertOne()可能在侦听器函数注册之前运行,因此我们使用一个计时器,定义为simulateAsyncPause,在执行插入之前等待1秒。

We also use simulateAsyncPause after the insertion of the document. 我们还在插入文档后使用simulateAsyncPauseThis provides ample time for the listener function to receive the change event and for the listener to complete its execution before closing the ChangeStream instance using the close() method.这为侦听器函数提供了充足的时间来接收更改事件,并为侦听器在使用close()方法关闭ChangeStream实例之前完成其执行。

Note

Reason to include timers包含计时器的原因

The timers used in this example are only for demonstration purposes. 本示例中使用的计时器仅用于演示目的。They make sure that there is enough time to register the listener and have the listener process the change event before exiting.他们确保有足够的时间注册侦听器,并让侦听器在退出之前处理更改事件。

import { MongoClient } from "mongodb";

// Replace the uri string with your MongoDB deployment's connection string.将uri字符串替换为MongoDB部署的连接字符串。
const uri = "<connection string uri>";

const client = new MongoClient(uri);

const simulateAsyncPause = () =>
new Promise(resolve => {
setTimeout(() => resolve(), 1000);
});

let changeStream;
async function run() {
try {
const database = client.db("insertDB");
const haikus = database.collection("haikus");

// open a Change Stream on the "haikus" collection打开“haikus(徘句)”集合的“变更流”
changeStream = haikus.watch();

// set up a listener when change events are emitted在发出更改事件时设置侦听器
changeStream.on("change", next => {
// process any change event处理任何更改事件
console.log("received a change to the collection: \t", next);
});

await simulateAsyncPause();

await myColl.insertOne({
title: "Record of a Shriveled Datum",
content: "No bytes, no problem. Just insert a document, in MongoDB",
});

await simulateAsyncPause();

await changeStream.close();

console.log("closed the change stream");
} finally {
await client.close();
}
}
run().catch(console.dir);
Note

Identical Code Snippets相同的代码段

The JavaScript and TypeScript code snippets above are identical. 上面的JavaScript和TypeScript代码片段是相同的。There are no TypeScript specific features of the driver relevant to this use case.驱动程序没有与此用例相关的TypeScript特定功能。

Visit the following resources for additional material on the classes and methods mentioned on this page:有关本页中提到的类和方法的其他材料,请访问以下资源: