Snapshot queries allow you to read data as it appeared at a single point in time in the recent past.快照查询允许您读取最近某个时间点出现的数据。
Starting in MongoDB 5.0, you can use read concern 从MongoDB 5.0开始,您可以使用读取关注"snapshot" to query data on secondary nodes. "snapshot"查询secondary节点上的数据。This feature increases the versatility and resilience of your application's reads. You do not need to create a static copy of your data, move it out into a separate system, and manually isolate these long-running queries from interfering with your operational workload. 此功能增加了应用程序读取的多功能性和弹性。您不需要创建数据的静态副本,将其移出到单独的系统中,并手动隔离这些长时间运行的查询,以免干扰操作工作负载。Instead, you can perform long-running queries against a live, transactional database while reading from a consistent state of the data.相反,您可以在读取一致状态的数据时对实时事务数据库执行长时间运行的查询。
Using read concern 在辅助节点上使用读取关注"snapshot" on secondary nodes does not impact your application's write workload. Only application reads benefit from long-running queries being isolated to secondaries."snapshot"不会影响应用程序的写入工作负载。只有应用程序读取受益于将长时间运行的查询隔离到二级。
Use snapshot queries when you want to:在以下情况下使用快照查询:
Perform multiple related queries and ensure that each query reads data from the same point in time.执行多个相关查询,并确保每个查询从同一时间点读取数据。Ensure that you read from a consistent state of the data from some point in the past.确保您从过去某个时间点的一致状态读取数据。
Comparing Local and Snapshot Read Concerns比较本地和快照读取关注
When MongoDB performs long-running queries using the default 当MongoDB使用默认的"local" read concern, the query results may contain data from writes that occur at the same time as the query. As a result, the query may return unexpected or inconsistent results."local"读取关注执行长时间运行的查询时,查询结果可能包含与查询同时发生的写入数据。因此,查询可能会返回意外或不一致的结果。
To avoid this scenario, create a session and specify read concern 为了避免这种情况,请创建一个会话并指定读取关注"snapshot". "snapshot"。With read concern 使用读取关注"snapshot", MongoDB runs your query with snapshot isolation, meaning that your query reads data as it appeared at a single point in time in the recent past."snapshot",MongoDB使用快照隔离运行查询,这意味着查询读取最近一段时间内出现的数据。
Examples示例
The examples on this page show how you can use snapshot queries to:此页面上的示例显示了如何使用快照查询来:
Run Related Queries From the Same Point in Time从同一时间点运行相关查询Read from a Consistent State of the Data from Some Point in the Past从过去某个时刻的一致数据状态读取
Run Related Queries From the Same Point in Time从同一时间点运行相关查询
Read concern 读取关注"snapshot" lets you run multiple related queries within a session and ensure that each query reads data from the same point in time."snapshot"允许您在会话中运行多个相关查询,并确保每个查询从同一时间点读取数据。
An animal shelter has a 动物收容所有一个宠物数据库,其中包含每种宠物的集合品。pets database that contains collections for each type of pet. The pets database has these collections:pets数据库有以下集合:
catsdogs
Each document in each collection contains an 每个集合中的每个文档都包含一个adoptable field, indicating whether the pet is available for adoption. For example, a document in the cats collection looks like this:adoptable(可收养)字段,指示宠物是否可供收养。例如,cats集合中的一个文档看起来像这样:
{
"name": "Whiskers",
"color": "white",
"age": 10,
"adoptable": true
}
You want to run a query to see the total number of pets available for adoption across all collections. To provide a consistent view of the data, you want to ensure that the data returned from each collection is from a single point in time.您想运行一个查询,查看所有集合中可供收养的宠物总数。为了提供一致的数据视图,您需要确保从每个集合返回的数据来自单个时间点。
To accomplish this goal, use read concern 为了实现这一目标,请在会话中使用读取关注"snapshot" within a session:"snapshot":
C
mongoc_client_session_t *cs = NULL;
mongoc_collection_t *cats_collection = NULL;
mongoc_collection_t *dogs_collection = NULL;
int64_t adoptable_pets_count = 0;
bson_error_t error;
mongoc_session_opt_t *session_opts;
cats_collection = mongoc_client_get_collection(client, "pets", "cats");
dogs_collection = mongoc_client_get_collection(client, "pets", "dogs");
/* Seed 'pets.cats' and 'pets.dogs' with example data */
if (!pet_setup(cats_collection, dogs_collection)) {
goto cleanup;
}
/* start a snapshot session */
session_opts = mongoc_session_opts_new();
mongoc_session_opts_set_snapshot(session_opts, true);
cs = mongoc_client_start_session(client, session_opts, &error);
mongoc_session_opts_destroy(session_opts);
if (!cs) {
MONGOC_ERROR("Could not start session: %s", error.message);
goto cleanup;
}
/*
* Perform the following aggregation pipeline, and accumulate the count in
* `adoptable_pets_count`.
*
* adoptablePetsCount = db.cats.aggregate(
* [ { "$match": { "adoptable": true } },
* { "$count": "adoptableCatsCount" } ], session=s
* ).next()["adoptableCatsCount"]
*
* adoptablePetsCount += db.dogs.aggregate(
* [ { "$match": { "adoptable": True} },
* { "$count": "adoptableDogsCount" } ], session=s
* ).next()["adoptableDogsCount"]
*
* Remember in order to apply the client session to
* this operation, you must append the client session to the options passed
* to `mongoc_collection_aggregate`, i.e.,
*
* mongoc_client_session_append (cs, &opts, &error);
* cursor = mongoc_collection_aggregate (
* collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL);
*/
accumulate_adoptable_count(cs, cats_collection, &adoptable_pets_count);
accumulate_adoptable_count(cs, dogs_collection, &adoptable_pets_count);
printf("there are %" PRId64 " adoptable pets\n", adoptable_pets_count);C++11
using namespace mongocxx;
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_document;
auto db = client["pets"];
int64_t adoptable_pets_count = 0;
auto opts = mongocxx::options::client_session{};
opts.snapshot(true);
auto session = client.start_session(opts);
{
pipeline p;
p.match(make_document(kvp("adoptable", true))).count("adoptableCatsCount");
auto cursor = db["cats"].aggregate(session, p);
for (auto doc : cursor) {
adoptable_pets_count += doc.find("adoptableCatsCount")->get_int32();
}
}
{
pipeline p;
p.match(make_document(kvp("adoptable", true))).count("adoptableDogsCount");
auto cursor = db["dogs"].aggregate(session, p);
for (auto doc : cursor) {
adoptable_pets_count += doc.find("adoptableDogsCount")->get_int32();
}
}Go
ctx := context.TODO()
sess, err := client.StartSession(options.Session().SetSnapshot(true))
if err != nil {
return err
}
defer sess.EndSession(ctx)
var adoptablePetsCount int32
err = mongo.WithSession(ctx, sess, func(ctx context.Context) error {
// Count the adoptable cats
const adoptableCatsOutput = "adoptableCatsCount"
cursor, err := db.Collection("cats").Aggregate(ctx, mongo.Pipeline{
bson.D{{"$match", bson.D{{"adoptable", true}}}},
bson.D{{"$count", adoptableCatsOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp := cursor.Current.Lookup(adoptableCatsOutput)
adoptableCatsCount, ok := resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", adoptableCatsOutput, cursor.Current)
}
adoptablePetsCount += adoptableCatsCount
// Count the adoptable dogs
const adoptableDogsOutput = "adoptableDogsCount"
cursor, err = db.Collection("dogs").Aggregate(ctx, mongo.Pipeline{
bson.D{{"$match", bson.D{{"adoptable", true}}}},
bson.D{{"$count", adoptableDogsOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp = cursor.Current.Lookup(adoptableDogsOutput)
adoptableDogsCount, ok := resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", adoptableDogsOutput, cursor.Current)
}
adoptablePetsCount += adoptableDogsCount
return nil
})
if err != nil {
return err
}Motor
db = client.pets
async with await client.start_session(snapshot=True) as s:
adoptablePetsCount = 0
docs = await db.cats.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s
).to_list(None)
adoptablePetsCount = docs[0]["adoptableCatsCount"]
docs = await db.dogs.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s
).to_list(None)
adoptablePetsCount += docs[0]["adoptableDogsCount"]
print(adoptablePetsCount)PHP
$catsCollection = $client->selectCollection('pets', 'cats');
$dogsCollection = $client->selectCollection('pets', 'dogs');
$session = $client->startSession(['snapshot' => true]);
$adoptablePetsCount = $catsCollection->aggregate(
[
['$match' => ['adoptable' => true]],
['$count' => 'adoptableCatsCount'],
],
['session' => $session],
)->toArray()[0]->adoptableCatsCount;
$adoptablePetsCount += $dogsCollection->aggregate(
[
['$match' => ['adoptable' => true]],
['$count' => 'adoptableDogsCount'],
],
['session' => $session],
)->toArray()[0]->adoptableDogsCount;
var_dump($adoptablePetsCount);Python
db = client.pets
with client.start_session(snapshot=True) as s:
adoptablePetsCount = (
(
db.cats.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}],
session=s,
)
).next()
)["adoptableCatsCount"]
adoptablePetsCount += (
(
db.dogs.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}],
session=s,
)
).next()
)["adoptableDogsCount"]
print(adoptablePetsCount)Ruby
client = Mongo::Client.new(uri_string, database: "pets")
client.start_session(snapshot: true) do |session|
adoptable_pets_count = client['cats'].aggregate([
{ "$match": { "adoptable": true } },
{ "$count": "adoptable_cats_count" }
], session: session).first["adoptable_cats_count"]
adoptable_pets_count += client['dogs'].aggregate([
{ "$match": { "adoptable": true } },
{ "$count": "adoptable_dogs_count" }
], session: session).first["adoptable_dogs_count"]
puts adoptable_pets_count
endThe preceding series of commands:前面的一系列命令:
Uses使用MongoClient()to establish a connection to the MongoDB deployment.MongoClient()建立与MongoDB部署的连接。Switches to the切换到petsdatabase.pets数据库。Establishes a session. The command specifies建立会话。该命令指定snapshot=True, so the session uses read concern"snapshot".snapshot=True,因此会话使用读取关注"snapshot"。Performs these actions for each collection in the对petsdatabase:pets数据库中的每个集合执行以下操作:Prints the打印adoptablePetsCountvariable.adoptablePetsCount变量。
All queries within the session read data as it appeared at the same point in time. As a result, the final count reflects a consistent snapshot of the data.会话中的所有查询都读取同一时间点出现的数据。因此,最终计数反映了数据的一致快照。
Note
If the session lasts longer than the WiredTiger history retention period (300 seconds, by default), the query errors with a 如果会话持续时间超过WiredTiger历史保留期(默认情况下为300秒),则查询将出现SnapshotTooOld error. SnapshotTooOld错误。To learn how to configure snapshot retention and enable longer-running queries, see Configure Snapshot Retention.要了解如何配置快照保留并启用运行时间更长的查询,请参阅配置快照保留。
Read from a Consistent State of the Data from Some Point in the Past从过去某个时刻的一致数据状态读取
Read concern 读取关注"snapshot" ensures that your query reads data as it appeared at some single point in time in the recent past."snapshot"可确保查询读取最近某个时间点出现的数据。
An online shoe store has a 一家在线鞋店有一个sales collection that contains data for each item sold at the store. For example, a document in the sales collection looks like this:sales集合,其中包含该店销售的每件商品的数据。例如,sales集合中的文档如下:
{
"shoeType": "boot",
"price": 30,
"saleDate": ISODate("2022-02-02T06:01:17.171Z")
}
Each day at midnight, a query runs to see how many pairs of shoes were sold that day. The daily sales query looks like this:每天午夜,都会运行一个查询,查看当天售出了多少双鞋。每日销售查询如下:
C
mongoc_client_session_t *cs = NULL;
mongoc_collection_t *sales_collection = NULL;
bson_error_t error;
mongoc_session_opt_t *session_opts;
bson_t *pipeline = NULL;
bson_t opts = BSON_INITIALIZER;
mongoc_cursor_t *cursor = NULL;
const bson_t *doc = NULL;
bool ok = true;
bson_iter_t iter;
int64_t total_sales = 0;
sales_collection = mongoc_client_get_collection(client, "retail", "sales");
/* seed 'retail.sales' with example data */
if (!retail_setup(sales_collection)) {
goto cleanup;
}
/* start a snapshot session */
session_opts = mongoc_session_opts_new();
mongoc_session_opts_set_snapshot(session_opts, true);
cs = mongoc_client_start_session(client, session_opts, &error);
mongoc_session_opts_destroy(session_opts);
if (!cs) {
MONGOC_ERROR("Could not start session: %s", error.message);
goto cleanup;
}
if (!mongoc_client_session_append(cs, &opts, &error)) {
MONGOC_ERROR("could not apply session options: %s", error.message);
goto cleanup;
}
pipeline = BCON_NEW("pipeline",
"[",
"{",
"$match",
"{",
"$expr",
"{",
"$gt",
"[",
"$saleDate",
"{",
"$dateSubtract",
"{",
"startDate",
"$$NOW",
"unit",
BCON_UTF8("day"),
"amount",
BCON_INT64(1),
"}",
"}",
"]",
"}",
"}",
"}",
"{",
"$count",
BCON_UTF8("totalDailySales"),
"}",
"]");
cursor = mongoc_collection_aggregate(sales_collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL);
bson_destroy(&opts);
ok = mongoc_cursor_next(cursor, &doc);
if (mongoc_cursor_error(cursor, &error)) {
MONGOC_ERROR("could not get totalDailySales: %s", error.message);
goto cleanup;
}
if (!ok) {
MONGOC_ERROR("%s", "cursor has no results");
goto cleanup;
}
ok = bson_iter_init_find(&iter, doc, "totalDailySales");
if (ok) {
total_sales = bson_iter_as_int64(&iter);
} else {
MONGOC_ERROR("%s", "missing key: 'totalDailySales'");
goto cleanup;
}C++11
using namespace mongocxx;
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_array;
using bsoncxx::builder::basic::make_document;
auto opts = mongocxx::options::client_session{};
opts.snapshot(true);
auto session = client.start_session(opts);
auto db = client["retail"];
pipeline p;
p
.match(make_document(kvp(
"$expr",
make_document(kvp(
"$gt",
make_array(
"$saleDate", make_document(kvp("startDate", "$$NOW"), kvp("unit", "day"), kvp("amount", 1))))))))
.count("totalDailySales");
auto cursor = db["sales"].aggregate(session, p);
auto doc = *cursor.begin();
auto total_daily_sales = doc.find("totalDailySales")->get_int32();Go
ctx := context.TODO()
sess, err := client.StartSession(options.Session().SetSnapshot(true))
if err != nil {
return err
}
defer sess.EndSession(ctx)
var totalDailySales int32
err = mongo.WithSession(ctx, sess, func(ctx context.Context) error {
// Count the total daily sales
const totalDailySalesOutput = "totalDailySales"
cursor, err := db.Collection("sales").Aggregate(ctx, mongo.Pipeline{
bson.D{{"$match",
bson.D{{"$expr",
bson.D{{"$gt",
bson.A{"$saleDate",
bson.D{{"$dateSubtract",
bson.D{
{"startDate", "$$NOW"},
{"unit", "day"},
{"amount", 1},
},
}},
},
}},
}},
}},
bson.D{{"$count", totalDailySalesOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp := cursor.Current.Lookup(totalDailySalesOutput)
var ok bool
totalDailySales, ok = resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", totalDailySalesOutput, cursor.Current)
}
return nil
})
if err != nil {
return err
}Motor
db = client.retail
async with await client.start_session(snapshot=True) as s:
docs = await db.sales.aggregate(
[
{
"$match": {
"$expr": {
"$gt": [
"$saleDate",
{
"$dateSubtract": {
"startDate": "$$NOW",
"unit": "day",
"amount": 1,
}
},
]
}
}
},
{"$count": "totalDailySales"},
],
session=s,
).to_list(None)
total = docs[0]["totalDailySales"]
print(total)PHP
$salesCollection = $client->selectCollection('retail', 'sales');
$session = $client->startSession(['snapshot' => true]);
$totalDailySales = $salesCollection->aggregate(
[
[
'$match' => [
'$expr' => [
'$gt' => ['$saleDate', [
'$dateSubtract' => [
'startDate' => '$$NOW',
'unit' => 'day',
'amount' => 1,
],
],
],
],
],
],
['$count' => 'totalDailySales'],
],
['session' => $session],
)->toArray()[0]->totalDailySales;Python
db = client.retail
with client.start_session(snapshot=True) as s:
_ = (
(
db.sales.aggregate(
[
{
"$match": {
"$expr": {
"$gt": [
"$saleDate",
{
"$dateSubtract": {
"startDate": "$$NOW",
"unit": "day",
"amount": 1,
}
},
]
}
}
},
{"$count": "totalDailySales"},
],
session=s,
)
).next()
)["totalDailySales"]Ruby
client = Mongo::Client.new(uri_string, database: "retail")
client.start_session(snapshot: true) do |session|
total = client['sales'].aggregate([
{
"$match": {
"$expr": {
"$gt": [
"$saleDate",
{
"$dateSubtract": {
startDate: "$$NOW",
unit: "day",
amount: 1
}
}
]
}
}
},
{ "$count": "total_daily_sales" }
], session: session).first["total_daily_sales"]
endThe preceding query:前面的查询:
Uses使用$matchwith$exprto specify a filter on thesaleDatefield.$match和$expr在saleDate字段上指定筛选器。Uses the使用$gtoperator and$dateSubtractexpression to return documents where thesaleDateis greater than one day before the time the query is executed.$gt运算符和$dateSubtract表达式返回saleDate大于执行查询前一天的文档。Uses使用$countto return a count of the matching documents. The count is stored in thetotalDailySalesvariable.$count返回匹配文档的计数。计数存储在totalDailySales变量中。Specifies read concern指定读取关注"snapshot"to ensure that the query reads from a single point in time."snapshot",以确保查询从单个时间点读取。
The sales collection is quite large, and as a result this query may take a few minutes to run. Because the store is online, sales can occur at any time of day.sales集合相当大,因此此查询可能需要几分钟才能运行。因为商店是在线的,所以销售可以在一天中的任何时间进行。
For example, consider if:例如,考虑以下情况:
The query begins executing at 12:00 AM.查询在上午12:00开始执行。A customer buys three pairs of shoes at 12:02 AM.一位顾客在凌晨12:02买了三双鞋。The query finishes executing at 12:04 AM.查询在凌晨12:04完成执行。
If the query doesn't use read concern 如果查询不使用读取关注"snapshot", sales that occur between when the query starts and when it finishes can be included in the query count, despite not occurring on the day the report is for. This could result in inaccurate reports with some sales being counted twice."snapshot",则查询开始和结束之间发生的销售额可以包含在查询计数中,尽管不是在报告的日期发生的。这可能会导致报告不准确,一些销售额被计算了两次。
By specifying read concern 通过指定读取关注"snapshot", the query only returns data that was present in the database at a point in time shortly before the query started executing."snapshot",查询只返回在查询开始执行前不久的某个时间点数据库中存在的数据。
Note
If the query takes longer than the WiredTiger history retention period (300 seconds, by default), the query errors with a 如果查询所需时间长于WiredTiger历史保留期(默认情况下为300秒),则查询将出现SnapshotTooOld error. SnapshotTooOld错误。To learn how to configure snapshot retention and enable longer-running queries, see Configure Snapshot Retention.要了解如何配置快照保留并启用运行时间更长的查询,请参阅配置快照保留。
Configure Snapshot Retention配置快照保留
By default, the WiredTiger storage engine retains history for 300 seconds. You can use a session with 默认情况下,WiredTiger存储引擎会保留历史记录300秒。从会话中的第一个操作到最后一个操作,您可以在snapshot=true for a total of 300 seconds from the time of the first operation in the session to the last. snapshot=true的会话中使用总共300秒。If you use the session for a longer period of time, the session fails with a 如果您使用会话的时间较长,会话将失败,并出现SnapshotTooOld error. SnapshotTooLold错误。Similarly, if you query data using read concern 同样,如果您使用读取关注"snapshot" and your query lasts longer than 300 seconds, the query fails."snapshot"查询数据,并且查询持续时间超过300秒,则查询失败。
If your query or session run for longer than 300 seconds, consider increasing the snapshot retention period. To increase the retention period, modify the 如果查询或会话运行时间超过300秒,请考虑延长快照保留期。要延长保留期,请修改minSnapshotHistoryWindowInSeconds parameter.minSnapshotHistoryWindowInSeconds参数。
For example, this command sets the value of 例如,此命令将minSnapshotHistoryWindowInSeconds to 600 seconds:minSnapshotHistoryWindowInSeconds的值设置为600秒:
db.adminCommand( { setParameter: 1, minSnapshotHistoryWindowInSeconds: 600 } )
Important
To modify 要修改MongoDB Atlas集群的minSnapshotHistoryWindowInSeconds for a MongoDB Atlas cluster, you must contact Atlas Support.minSnapshotHistoryWindowInSeconds,您必须联系Atlas支持。
Disk Space and History磁盘空间和历史记录
Increasing the value of 增加minSnapshotHistoryWindowInSeconds increases disk usage because the server must maintain the history of older modified values within the specified time window. minSnapshotHistoryWindowInSeconds的值会增加磁盘使用率,因为服务器必须在指定的时间窗口内维护旧修改值的历史记录。The amount of disk space used depends on your workload, with higher volume workloads requiring more disk space.使用的磁盘空间量取决于工作负载,更高容量的工作负载需要更多的磁盘空间。