Docs Home / Node.js Driver

Monitor Data with Change Streams使用变更流监控数据

Overview概述

In this guide, you can learn how to use a change stream to monitor real-time changes to your database. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.在本指南中,您可以学习如何使用变更流来监视数据库的实时更改。变更流是MongoDB Server的一项功能,允许您的应用程序订阅集合、数据库或部署上的数据变更。

Tip

Atlas Stream ProcessingAtlas流处理

As an alternative to change streams, you can use Atlas Stream Processing to process and transform streams of data. Unlike change streams, which register only database events, Atlas Stream Processing manages multiple data event types and provides extended data processing capabilities. 作为更改流的替代方案,您可以使用Atlas流处理来处理和转换数据流。与仅注册数据库事件的变更流不同,Atlas Stream Processing管理多种数据事件类型,并提供扩展的数据处理功能。To learn more about this feature, see Atlas Stream Processing in the MongoDB Atlas documentation.要了解有关此功能的更多信息,请参阅MongoDB Atlas文档中的Atlas流处理

Open a Change Stream打开更改流

You can watch for changes in MongoDB using the watch() method on the following objects:您可以使用以下对象的watch()方法监视MongoDB中的更改:

For each object, the watch() method opens a change stream to emit change event documents when they occur.对于每个对象,watch()方法打开一个更改流,在发生更改事件时发出更改事件文档。

The watch() method optionally takes an aggregation pipeline which consists of an array of aggregation stages as the first parameter. The aggregation stages filter and transform the change events.watch()方法可选地将由聚合阶段数组组成的聚合管道作为第一个参数。聚合阶段筛选和转换更改事件。

In the following snippet, the $match stage matches all change event documents with a runtime value of less than 15, filtering all others out.在下面的代码片段中,$match阶段匹配runtime值小于15的所有更改事件文档,筛选掉所有其他文档。

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);

The watch() method accepts an options object as the second parameter. Refer to the links at the end of this section for more information on the settings you can configure with this object.watch()方法接受options对象作为第二个参数。有关可以使用此对象配置的设置的更多信息,请参阅本节末尾的链接。

The watch() method returns an instance of a ChangeStream. You can read events from change streams by iterating over them or listening for events.watch()方法返回一个ChangeStream实例。您可以通过迭代或监听事件来从更改流中读取事件。

Warning

Using a ChangeStream in EventEmitter and Iterator mode concurrently is not supported by the driver and causes an error. This is to prevent undefined behavior, where the driver cannot guarantee which consumer receives documents first.驱动程序不支持在EventEmitterIterator模式下同时使用ChangeStream,并会导致错误。这是为了防止未定义的行为,即驱动程序无法保证哪个消费者首先收到文档。

Select the tab that corresponds to the way you want to read events from the change stream:选择与要从更改流中读取事件的方式相对应的选项卡:

Idiomatic Iteration习惯性迭代

Starting in version 4.12, ChangeStream objects are async iterables. With this change, you can use for-await loops to retrieve events from an open change stream:从4.12版本开始,ChangeStream对象是异步可迭代对象。通过此更改,您可以使用for-await循环从打开的更改流中检索事件:

for await (const change of changeStream) {
console.log("Received change: ", change);
}
Manual Iteration手动迭代

You can call methods on the ChangeStream object such as:您可以调用ChangeStream对象上的方法,例如:

  • hasNext() to check for remaining documents in the stream用来检查流中的剩余文档
  • next() to request the next document in the stream用来请求流中的下一个文档
  • close() to close the ChangeStream用来关闭ChangeStream
Event事件

You can attach listener functions to the ChangeStream object by calling the on() method. This method is inherited from the JavaScript EventEmitter class. 您可以通过调用on()方法将侦听器函数附加到ChangeStream对象。此方法继承自JavaScript EventEmitter类。Pass the string "change" as the first parameter and your listener function as the second parameter as shown below:将字符串"change"作为第一个参数传递,将监听器函数作为第二个参数传递如下:

changeStream.on("change", (changeEvent) => { /* your listener function */ });

The listener function triggers when a change event is emitted. You can specify logic in the listener to process the change event document when it is received.当发出change事件时,侦听器函数会触发。您可以在侦听器中指定逻辑,以便在收到更改事件文档时对其进行处理。

You can control the change stream by calling pause() to stop emitting events or resume() to continue to emit events.您可以通过调用pause()停止发出事件或resume()继续发出事件来控制更改流。

To stop processing change events, call the close() method on the ChangeStream instance. This closes the change stream and frees resources.要停止处理更改事件,请在ChangeStream实例上调用close()方法。这将关闭更改流并释放资源。

changeStream.close();

Examples示例

Iteration迭代

Note

You can use this example to connect to an instance of MongoDB and interact with a database that contains sample data. To learn more about connecting to your MongoDB instance and loading a sample dataset, see the Get Started with the Node.js Driver guide.您可以使用此示例连接到MongoDB的实例,并与包含示例数据的数据库进行交互。要了解有关连接到MongoDB实例和加载示例数据集的更多信息,请参阅Node.js驱动程序入门指南。

Note

No TypeScript Specific Features没有TypeScript特定功能

The following code example uses JavaScript. There are no TypeScript specific features of the driver relevant to this use case.以下代码示例使用JavaScript。驱动程序中没有与此用例相关的TypeScript特定功能。

The following example opens a change stream on the haikus collection in the insertDB database and prints change events as they occur:以下示例在insertDB数据库中的haikus集合上打开一个更改流,并在更改事件发生时打印出来:

/* Change stream listener更改流侦听器 */

import { MongoClient } from "mongodb";

// Replace the uri string with your MongoDB deployment's connection string将uri字符串替换为MongoDB部署的连接字符串
const uri = "<connection string uri>";

const client = new MongoClient(uri);

const simulateAsyncPause = () =>
new Promise(resolve => {
setTimeout(() => resolve(), 1000);
});

let changeStream;
async function run() {
try {
const database = client.db("insertDB");
const haikus = database.collection("haikus");

// Open a Change Stream on the "haikus" collection在“俳句”集合上打开更改流
changeStream = haikus.watch();

// Set up a change stream listener when change events are emitted在发出更改事件时设置更改流侦听器
changeStream.on("change", next => {
// Print any change event打印任何更改事件
console.log("received a change to the collection: \t", next);
});

// Pause before inserting a document插入文档前暂停
await simulateAsyncPause();

// Insert a new document into the collection在集合中插入新文档
await myColl.insertOne({
title: "Record of a Shriveled Datum",
content: "No bytes, no problem. Just insert a document, in MongoDB",
});

// Pause before closing the change stream关闭更改流前暂停
await simulateAsyncPause();

// Close the change stream and print a message to the console when it is closed关闭更改流,并在关闭时向控制台打印一条消息
await changeStream.close();
console.log("closed the change stream");
} finally {
// Close the database connection on completion or error完成或出错时关闭数据库连接
await client.close();
}
}
run().catch(console.dir);

Tip

Explicit Resource Management显式资源管理

The Node.js driver natively supports explicit resource management for MongoClient, ClientSession, ChangeStreams, and cursors. Node.js驱动程序原生支持MongoClientClientSessionChangeStreams和游标的显式资源管理。This feature is experimental and subject to change. To learn how to use explicit resource management, see the v6.9 Release Notes.此功能是实验性的,可能会发生变化。要了解如何使用显式资源管理,请参阅v6.9发行说明

When you run this code and then make a change to the haikus collection, such as performing an insert or delete operation, you can see the change event document printed in your terminal.当您运行此代码并对haikus集合进行更改时,例如执行插入或删除操作,您可以在终端中看到打印的更改事件文档。

For example, if you insert a document to the collection, the code prints the following output:例如,如果将文档插入到集合中,代码将打印以下输出:

Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}

Note

Receive Full Documents From Updates从更新中接收完整文档

Change events that contain information on update operations only return the modified fields by default rather than the full updated document. You can configure your change stream to also return the most current version of the document by setting the fullDocument field of the options object to "updateLookup" as follows:默认情况下,包含更新操作信息的更改事件仅返回修改后的字段,而不是完整的更新文档。您可以通过将options对象的fullDocument字段设置为"updateLookup"来配置更改流,以返回文档的最新版本,如下所示:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.
const pipeline = [];

const changeStream = myColl.watch(pipeline, options);

Listener Function侦听器函数

The following example opens a change stream on the haikus collection in the insertDB database. Let's create a listener function to receive and print change events that occur on the collection.以下示例在insertDB数据库中的haikus集合上打开一个更改流。让我们创建一个监听器函数来接收和打印集合上发生的更改事件。

First, open the change stream on the collection and then define a listener on the change stream using the on() method. Once you set the listener, generate a change event by performing a change to the collection.首先,打开集合上的更改流,然后使用on()方法在更改流上定义一个监听器。设置侦听器后,通过对集合执行更改来生成更改事件。

To generate the change event on the collection, let's use the insertOne() method to add a new document. Since insertOne() may run before the listener function can register, we use a timer, defined as simulateAsyncPause to wait 1 second before executing the insert.要在集合上生成更改事件,让我们使用insertOne()方法添加一个新文档。由于insertOne()可能会在侦听器函数注册之前运行,因此我们使用一个计时器,定义为simulateAsyncPause,在执行插入之前等待1秒。

We also use simulateAsyncPause after the insertion of the document. 在插入文档后,我们还使用simulateAsyncPauseThis provides ample time for the listener function to receive the change event and for the listener to complete its execution before closing the ChangeStream instance using the close() method.这为监听器函数接收更改事件以及监听器在使用close()方法关闭ChangeStream实例之前完成其执行提供了充足的时间。

Note

Reason to include timers包含计时器的原因

The timers used in this example are only for demonstration purposes. They make sure that there is enough time to register the listener and have the listener process the change event before exiting.本示例中使用的计时器仅用于演示目的。他们确保有足够的时间注册侦听器,并让侦听器在退出前处理更改事件。

Note

No TypeScript Specific Features没有TypeScript特定功能

The following code example uses JavaScript. There are no TypeScript specific features of the driver relevant to this use case.以下代码示例使用JavaScript。驱动程序中没有与此用例相关的TypeScript特定功能。

/* Change stream listener更改流侦听器 */

import { MongoClient } from "mongodb";

// Replace the uri string with your MongoDB deployment's connection string将uri字符串替换为MongoDB部署的连接字符串
const uri = "<connection string uri>";

const client = new MongoClient(uri);

const simulateAsyncPause = () =>
new Promise(resolve => {
setTimeout(() => resolve(), 1000);
});

let changeStream;
async function run() {
try {
const database = client.db("insertDB");
const haikus = database.collection("haikus");

// Open a Change Stream on the "haikus" collection在“俳句”集合上打开更改流
changeStream = haikus.watch();

// Set up a change stream listener when change events are emitted在发出更改事件时设置更改流侦听器
changeStream.on("change", next => {
// Print any change event打印任何更改事件
console.log("received a change to the collection: \t", next);
});

// Pause before inserting a document插入文档前暂停
await simulateAsyncPause();

// Insert a new document into the collection在集合中插入新文档
await myColl.insertOne({
title: "Record of a Shriveled Datum",
content: "No bytes, no problem. Just insert a document, in MongoDB",
});

// Pause before closing the change stream关闭更改流前暂停
await simulateAsyncPause();

// Close the change stream and print a message to the console when it is closed关闭更改流,并在关闭时向控制台打印一条消息
await changeStream.close();
console.log("closed the change stream");
} finally {
// Close the database connection on completion or error完成或出错时关闭数据库连接
await client.close();
}
}
run().catch(console.dir);

Visit the following resources for more material on the classes and methods mentioned on this page:访问以下资源,了解更多关于本页提到的类和方法的材料: