Watch for Changes监视更改
Open a Change Stream打开变更流
You can watch for changes in MongoDB using the 您可以使用watch()
method on the following objects:watch()
方法对以下对象监视MongoDB中的更改:
For each object, the 对于每个对象,watch()
method opens a change stream to emit change event documents when they occur.watch()
方法打开一个变更流,以便在发生变更时发出变更事件文档。
The watch()
method optionally takes an aggregation pipeline which consists of an array of aggregation stages as the first parameter. watch()
方法可选地将一个由聚合阶段数组组成的聚合管道作为第一个参数。The aggregation stages filter and transform the change events.聚合阶段筛选和转换更改事件。
In the following snippet, the 在下面的代码段中,$match
stage matches all change event documents with a runtime
value of less than 15, filtering all others out.$match
阶段匹配runtime
值小于15的所有更改事件文档,筛选掉所有其他文档。
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = myColl.watch(pipeline);
The watch()
method accepts an options
object as the second parameter. watch()
方法接受一个options
对象作为第二个参数。Refer to the links at the end of this section for more information on the settings you can configure with this object.有关可以使用此对象配置的设置的详细信息,请参阅本节末尾的链接。
The watch()
method returns an instance of a ChangeStream.
watch()
方法返回ChangeStream
的一个实例。
You can read events from change streams by iterating over them or listening for events.您可以通过迭代或侦听事件来读取变更流中的事件。
Select the tab that corresponds to the way you want to read events from the change stream:选择与您希望从更改流中读取事件的方式相对应的选项卡:
Starting in version 4.12, 从4.12版本开始,ChangeStream
objects are async iterables. With this change, you can use for-await
loops to retrieve events from an open change stream:ChangeStream
对象是异步可迭代对象。通过此更改,您可以使用for-await
循环从打开的更改流中检索事件:
for await (const change of changeStream) {
console.log("Received change: ", change);
}
You can call methods on the 您可以调用ChangeStream
object such as:ChangeStream
对象上的方法,例如:
hasNext()
to check for remaining documents in the stream检查流中的剩余文档next()
to request the next document in the stream请求流中的下一个文档close()
to close the ChangeStream关闭变更流
You can attach listener functions to the 您可以通过调用ChangeStream
object by calling the on()
method. on()
方法将侦听器函数附加到ChangeStream
对象。This method is inherited from the Javascript 此方法继承自Javascript EventEmitter
class. EventEmitter
类。Pass the string 将字符串"change"
as the first parameter and your listener function as the second parameter as shown below:"change"
作为第一个参数,将侦听器函数作为第二个参数,如下所示:
changeStream.on("change", (changeEvent) => { /*your listener function你的监听器函数 */ });
The listener function triggers when a 侦听器函数在发出change
event is emitted. change
事件时触发。You can specify logic in the listener to process the change event document when it is received.您可以在侦听器中指定逻辑,以便在收到更改事件文档时对其进行处理。
You can control the change stream by calling 您可以通过调用pause()
to stop emitting events or resume()
to continue to emit events.pause()
停止发出事件或调用resume()
继续发出事件来控制更改流。
To stop processing change events, call the close() method on the 要停止处理更改事件,请在ChangeStream
instance. ChangeStream
实例上调用close()
方法。This closes the change stream and frees resources.这将关闭更改流并释放资源。
changeStream.close();
Using a 驱动程序不支持在ChangeStream
in EventEmitter
and Iterator
mode concurrently is not supported by the driver and causes an error. EventEmitter
和Iterator
模式下同时使用ChangeStream
,这会导致错误。This is to prevent undefined behavior, where the driver cannot guarantee which consumer receives documents first.这是为了防止未定义的行为,即驱动程序无法保证哪个消费者首先接收文档。
Examples实例
Iteration
The following example opens a change stream on the 以下示例在haikus
collection in the insertDB
database and prints change events as they occur:insertDB
数据库中打开haikus
集合上的更改流,并在更改事件发生时打印它们:
You can use this example to connect to an instance of MongoDB and interact with a database that contains sample data. 您可以使用此示例连接到MongoDB的实例,并与包含示例数据的数据库进行交互。To learn more about connecting to your MongoDB instance and loading a sample dataset, see the Usage Examples guide.要了解有关连接到MongoDB实例和加载示例数据集的更多信息,请参阅用法实例指南。
import { MongoClient } from "mongodb";
//Replace the uri string with your MongoDB deployment's connection string.将uri字符串替换为MongoDB部署的连接字符串。
const uri = "<connection string uri>";
const client = new MongoClient(uri);
let changeStream;
async function run() {
try {
const database = client.db("insertDB");
const haikus = database.collection("haikus");
//Open a Change Stream on the "haikus" collection打开“haikus(俳句)”集合的变更流
changeStream = haikus.watch();
//Print change events打印更改事件
for await (const change of changeStream) {
console.log("Received change:\n", change);
}
await changeStream.close();
} finally {
await client.close();
}
}
run().catch(console.dir);
Identical Code Snippets相同的代码段
The JavaScript and TypeScript code snippets above are identical. 上面的JavaScript和TypeScript代码片段是相同的。There are no TypeScript specific features of the driver relevant to this use case.驱动程序没有与此用例相关的TypeScript特定功能。
When you run this code and then make a change to the 当您运行此代码,然后对haikus
collection, such as performing an insert or delete operation, you can see the change event document printed in your terminal.haikus
集合进行更改(例如执行插入或删除操作)时,您可以看到打印在终端中的更改事件文档。
For example, if you insert a document to the collection, the code prints the following output:例如,如果将文档插入集合,则代码将打印以下输出:
Received change:
{
_id: {
_data: '...'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1675800603, i: 31 }),
fullDocument: {
_id: new ObjectId("..."),
...
},
ns: { db: 'insertDB', coll: 'haikus' },
documentKey: { _id: new ObjectId("...") }
}
Receive Full Documents From Updates接收来自更新的完整文档
Change events that contain information on update operations only return the modified fields by default rather than the full updated document. 默认情况下,包含更新操作信息的更改事件只返回修改后的字段,而不是完整更新的文档。You can configure your change stream to also return the most current version of the document by setting the 通过将选项对象的fullDocument
field of the options object to "updateLookup"
as follows:fullDocument
字段设置为"updateLookup"
,您可以将更改流配置为也返回文档的最新版本,如下所示:
const options = { fullDocument: "updateLookup" };
//This could be any pipeline.这可以是任何管道。
const pipeline = [];
const changeStream = myColl.watch(pipeline, options);
Listener Function侦听器函数
The following example opens a change stream on the 以下示例在haikus
collection in the insertDB
database. insertDB
数据库中打开一个关于haikus
集合的更改流。Let's create a listener function to receive and print change events that occur on the collection.让我们创建一个侦听器函数来接收和打印集合上发生的更改事件。
First, open the change stream on the collection and then define a listener on the change stream using the 首先,打开集合上的变更流,然后使用on()
method. on()
方法在变更流上定义一个侦听器。Once you set the listener, generate a change event by performing a change to the collection.设置侦听器后,通过对集合执行更改来生成更改事件。
To generate the change event on the collection, let's use the 要在集合上生成更改事件,让我们使用insertOne()
method to add a new document. insertOne()
方法添加一个新文档。Since 由于insertOne()
may run before the listener function can register, we use a timer, defined as simulateAsyncPause
to wait 1 second before executing the insert.insertOne()
可能在侦听器函数注册之前运行,因此我们使用一个计时器,定义为simulateAsyncPause
,在执行插入之前等待1秒。
We also use 我们还在插入文档后使用simulateAsyncPause
after the insertion of the document. simulateAsyncPause
。This provides ample time for the listener function to receive the change event and for the listener to complete its execution before closing the 这为侦听器函数提供了充足的时间来接收更改事件,并为侦听器在使用ChangeStream
instance using the close()
method.close()
方法关闭ChangeStream
实例之前完成其执行。
Reason to include timers包含计时器的原因
The timers used in this example are only for demonstration purposes. 本示例中使用的计时器仅用于演示目的。They make sure that there is enough time to register the listener and have the listener process the change event before exiting.他们确保有足够的时间注册侦听器,并让侦听器在退出之前处理更改事件。
import { MongoClient } from "mongodb";
//Replace the uri string with your MongoDB deployment's connection string.将uri字符串替换为MongoDB部署的连接字符串。
const uri = "<connection string uri>";
const client = new MongoClient(uri);
const simulateAsyncPause = () =>
new Promise(resolve => {
setTimeout(() => resolve(), 1000);
});
let changeStream;
async function run() {
try {
const database = client.db("insertDB");
const haikus = database.collection("haikus");
//open a Change Stream on the "haikus" collection打开“haikus(徘句)”集合的“变更流”
changeStream = haikus.watch();
//set up a listener when change events are emitted在发出更改事件时设置侦听器
changeStream.on("change", next => {
//process any change event处理任何更改事件
console.log("received a change to the collection: \t", next);
});
await simulateAsyncPause();
await myColl.insertOne({
title: "Record of a Shriveled Datum",
content: "No bytes, no problem. Just insert a document, in MongoDB",
});
await simulateAsyncPause();
await changeStream.close();
console.log("closed the change stream");
} finally {
await client.close();
}
}
run().catch(console.dir);
Identical Code Snippets相同的代码段
The JavaScript and TypeScript code snippets above are identical. 上面的JavaScript和TypeScript代码片段是相同的。There are no TypeScript specific features of the driver relevant to this use case.驱动程序没有与此用例相关的TypeScript特定功能。
Visit the following resources for additional material on the classes and methods mentioned on this page:有关本页中提到的类和方法的其他材料,请访问以下资源: