Callback API vs Core API回调API与核心API
The Callback API:回调API:
Starts a transaction, executes the specified operations, and commits (or aborts on error).启动事务,执行指定的操作,并提交(或在出现错误时中止)。Automatically incorporates error handling logic for自动为TransientTransactionErrorandUnknownTransactionCommitResult.TransientTransactionError和UnknownTransactionCommitResult合并错误处理逻辑。
The Core API:核心API:
Requires explicit call to start the transaction and commit the transaction.需要显式调用来启动事务并提交事务。Does not incorporate error handling logic for不包含TransientTransactionErrorandUnknownTransactionCommitResult, and instead provides the flexibility to incorporate custom error handling for these errors.TransientTransactionError和UnknownTransactionCommitResult的错误处理逻辑,而是提供了为这些错误包含自定义错误处理的灵活性。
Callback API
The callback API incorporates logic:回调API包含以下逻辑:
To retry the transaction as a whole if the transaction encounters a如果事务遇到TransientTransactionErrorerror.TransientTransactionError错误,则作为一个整体重试事务。To retry the commit operation if the commit encounters an如果提交遇到UnknownTransactionCommitResulterror.UnknownTransactionCommitResult错误,请重试提交操作。
Starting in MongoDB 6.2, the server does not retry the transaction if it receives a 从MongoDB 6.2开始,如果服务器收到TransactionTooLargeForCache error.TransactionTooLargeForCache错误,则不会重试事务。
Example示例
➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。
C
Important
Use the MongoDB driver for your MongoDB version.为MongoDB版本使用MongoDB驱动程序。When using drivers, each operation in the transaction must pass the session to each operation.使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.事务中的操作使用事务级读取关注、事务级写入关注和事务级读取首选项。You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.您可以隐式或显式地在事务中创建集合。请参见在事务中创建集合和索引。
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). 该示例使用新的回调API处理事务,启动事务、执行指定的操作并提交(或在出现错误时中止)。The new callback API incorporates retry logic for 新回调API包含TransientTransactionError or UnknownTransactionCommitResult commit errors.TransientTransactionError或UnknownTransactionCommitResult提交错误的重试逻辑。
static bool
with_transaction_example(bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new();
mongoc_write_concern_set_wmajority(wc, 1000);
insert_opts = bson_new();
mongoc_write_concern_append(wc, insert_opts);
coll = mongoc_client_get_collection(client, "mydb1", "foo");
doc = BCON_NEW("abc", BCON_INT32(0));
ret = mongoc_collection_insert_one(coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy(doc);
mongoc_collection_destroy(coll);
coll = mongoc_client_get_collection(client, "mydb2", "bar");
doc = BCON_NEW("xyz", BCON_INT32(0));
ret = mongoc_collection_insert_one(coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session(client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new();
mongoc_transaction_opts_set_write_concern(txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction(session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy(doc);
mongoc_collection_destroy(coll);
bson_destroy(insert_opts);
mongoc_write_concern_destroy(wc);
mongoc_transaction_opts_destroy(txn_opts);
mongoc_client_session_destroy(session);
mongoc_client_destroy(client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback(mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
BSON_UNUSED(ctx);
client = mongoc_client_session_get_client(session);
coll = mongoc_client_get_collection(client, "mydb1", "foo");
doc = BCON_NEW("abc", BCON_INT32(1));
ret = mongoc_collection_insert_one(coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy(doc);
mongoc_collection_destroy(coll);
coll = mongoc_client_get_collection(client, "mydb2", "bar");
doc = BCON_NEW("xyz", BCON_INT32(999));
ret = mongoc_collection_insert_one(coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy(coll);
bson_destroy(doc);
return success;
}C++11
Important
Use the MongoDB driver for your MongoDB version.为MongoDB版本使用MongoDB驱动程序。When using drivers, each operation in the transaction must pass the session to each operation.使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.事务中的操作使用事务级读取关注、事务级写入关注和事务级读取首选项。You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.您可以隐式或显式地在事务中创建集合。请参见在事务中创建集合和索引。
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). 该示例使用新的回调API处理事务,启动事务、执行指定的操作并提交(或在出现错误时中止)。The new callback API incorporates retry logic for 新回调API包含TransientTransactionError or UnknownTransactionCommitResult commit errors.TransientTransactionError或UnknownTransactionCommitResult提交错误的重试逻辑。
// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{}};
// Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be
// needed. The suggested Atlas connection string includes majority write concern by default.
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (mongocxx::exception const& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
session.with_transaction(callback, opts);
} catch (mongocxx::exception const& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;C#
Important
Use the MongoDB driver for your MongoDB version.为MongoDB版本使用MongoDB驱动程序。When using drivers, each operation in the transaction must pass the session to each operation.使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.事务中的操作使用事务级读取关注、事务级写入关注和事务级读取首选项。You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.您可以隐式或显式地在事务中创建集合。请参见在事务中创建集合和索引。
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}Go
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.
Important
- Use the MongoDB driver for your MongoDB version.
- When using drivers, each operation in the transaction must pass the session to each operation.
- Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.
- You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.
// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (any, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}Java(Sync)
Important
- Use the MongoDB driver for your MongoDB version.
- When using drivers, each operation in the transaction must pass the session to each operation.
- Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.
- You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.
/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances; e.g.
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations..
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}Motor
Important
- Use the MongoDB driver for your MongoDB version.
- When using drivers, each operation in the transaction must pass the session to each operation.
- Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.
- You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)Node.js
Important
Use the MongoDB driver for your MongoDB version.为MongoDB版本使用MongoDB驱动程序。When using drivers, each operation in the transaction must pass the session to each operation.使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.事务中的操作使用事务级读取关注、事务级写入关注和事务级读取首选项。You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.您可以隐式或显式地在事务中创建集合。请参见在事务中创建集合和索引。
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). 该示例使用新的回调API处理事务,启动事务、执行指定的操作并提交(或在出现错误时中止)。The new callback API incorporates retry logic for 新回调API包含TransientTransactionError or UnknownTransactionCommitResult commit errors.TransientTransactionError或UnknownTransactionCommitResult提交错误的重试逻辑。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}PHP
Important
Use the MongoDB driver for your MongoDB version.为MongoDB版本使用MongoDB驱动程序。- When using drivers, each operation in the transaction must pass the session to each operation.
- Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.
- You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.
/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);Python
Important
- Use the MongoDB driver for your MongoDB version.
- When using drivers, each operation in the transaction must pass the session to each operation.
- Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.
- You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(callback)Ruby
Important
- Use the MongoDB driver for your MongoDB version.
- When using drivers, each operation in the transaction must pass the session to each operation.
- Operations in a transaction use transaction-level read concern, transaction-level write concern, and transaction-level read preference.
- You can create collections in transactions implicitly or explicitly. See Create Collections and Indexes in a Transaction.
The example uses the new callback API for working with transactions, which starts a transaction, executes the specified operations, and commits (or aborts on error). The new callback API incorporates retry logic for TransientTransactionError or UnknownTransactionCommitResult commit errors.
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)scala
Note
For the Scala driver, see the Core API usage example instead.
Core API
The core transaction API does not incorporate retry logic for errors labeled:核心事务API不包含标记为以下错误的重试逻辑:
TransientTransactionError. If an operation in a transaction returns an error labeled如果事务中的操作返回标记为TransientTransactionError, the transaction as a whole can be retried.TransientTransactionError的错误,则可以重试整个事务。To handle为了处理TransientTransactionError, applications should explicitly incorporate retry logic for the error.TransientTransactionError,应用程序应该明确地包含错误的重试逻辑。UnknownTransactionCommitResult. If the commit returns an error labeled。如果提交返回标记为UnknownTransactionCommitResult, the commit can be retried.UnknownTransactionCommitResult的错误,则可以重试提交。To handle为了处理UnknownTransactionCommitResult, applications should explicitly incorporate retry logic for the error.UnknownTransactionCommitResult,应用程序应该明确地包含错误的重试逻辑。
Example示例
➤ Use the Select your language drop-down menu in the upper-right to set the language of the examples on this page.使用右上角的“选择语言”下拉菜单设置此页面上示例的语言。
The following example incorporates logic to retry the transaction for transient errors and retry the commit for unknown commit error:以下示例包含了针对暂时错误重试事务和针对未知提交错误重试提交的逻辑:
C
/* takes a session, an out-param for server reply, and out-param for error. */
typedef bool (*txn_func_t)(mongoc_client_session_t *, bson_t *, bson_error_t *);
/* runs transactions with retry logic */
bool
run_transaction_with_retry(txn_func_t txn_func, mongoc_client_session_t *cs, bson_error_t *error)
{
bson_t reply;
bool r;
while (true) {
/* perform transaction */
r = txn_func(cs, &reply, error);
if (r) {
/* success */
bson_destroy(&reply);
return true;
}
MONGOC_WARNING("Transaction aborted: %s", error->message);
if (mongoc_error_has_label(&reply, "TransientTransactionError")) {
/* on transient error, retry the whole transaction */
MONGOC_WARNING("TransientTransactionError, retrying transaction...");
bson_destroy(&reply);
} else {
/* non-transient error */
break;
}
}
bson_destroy(&reply);
return false;
}
/* commit transactions with retry logic */
bool
commit_with_retry(mongoc_client_session_t *cs, bson_error_t *error)
{
bson_t reply;
bool r;
while (true) {
/* commit uses write concern set at transaction start, see
* mongoc_transaction_opts_set_write_concern */
r = mongoc_client_session_commit_transaction(cs, &reply, error);
if (r) {
MONGOC_DEBUG("Transaction committed");
break;
}
if (mongoc_error_has_label(&reply, "UnknownTransactionCommitResult")) {
MONGOC_WARNING("UnknownTransactionCommitResult, retrying commit ...");
bson_destroy(&reply);
} else {
/* commit failed, cannot retry */
break;
}
}
bson_destroy(&reply);
return r;
}
/* updates two collections in a transaction and calls commit_with_retry */
bool
update_employee_info(mongoc_client_session_t *cs, bson_t *reply, bson_error_t *error)
{
mongoc_client_t *client;
mongoc_collection_t *employees;
mongoc_collection_t *events;
mongoc_read_concern_t *rc;
mongoc_write_concern_t *wc;
mongoc_transaction_opt_t *txn_opts;
bson_t opts = BSON_INITIALIZER;
bson_t *filter = NULL;
bson_t *update = NULL;
bson_t *event = NULL;
bool r;
bson_init(reply);
client = mongoc_client_session_get_client(cs);
employees = mongoc_client_get_collection(client, "hr", "employees");
events = mongoc_client_get_collection(client, "reporting", "events");
rc = mongoc_read_concern_new();
mongoc_read_concern_set_level(rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
wc = mongoc_write_concern_new();
mongoc_write_concern_set_w(
wc, MONGOC_WRITE_CONCERN_W_MAJORITY); /* Atlas connection strings include majority by default*/
txn_opts = mongoc_transaction_opts_new();
mongoc_transaction_opts_set_read_concern(txn_opts, rc);
mongoc_transaction_opts_set_write_concern(txn_opts, wc);
r = mongoc_client_session_start_transaction(cs, txn_opts, error);
if (!r) {
goto done;
}
r = mongoc_client_session_append(cs, &opts, error);
if (!r) {
goto done;
}
filter = BCON_NEW("employee", BCON_INT32(3));
update = BCON_NEW("$set", "{", "status", "Inactive", "}");
/* mongoc_collection_update_one will reinitialize reply */
bson_destroy(reply);
r = mongoc_collection_update_one(employees, filter, update, &opts, reply, error);
if (!r) {
goto abort;
}
event = BCON_NEW("employee", BCON_INT32(3));
BCON_APPEND(event, "status", "{", "new", "Inactive", "old", "Active", "}");
bson_destroy(reply);
r = mongoc_collection_insert_one(events, event, &opts, reply, error);
if (!r) {
goto abort;
}
r = commit_with_retry(cs, error);
abort:
if (!r) {
MONGOC_ERROR("Aborting due to error in transaction: %s", error->message);
mongoc_client_session_abort_transaction(cs, NULL);
}
done:
mongoc_collection_destroy(employees);
mongoc_collection_destroy(events);
mongoc_read_concern_destroy(rc);
mongoc_write_concern_destroy(wc);
mongoc_transaction_opts_destroy(txn_opts);
bson_destroy(&opts);
bson_destroy(filter);
bson_destroy(update);
bson_destroy(event);
return r;
}
void
example_func(mongoc_client_t *client)
{
mongoc_client_session_t *cs;
bson_error_t error;
bool r;
ASSERT(client);
cs = mongoc_client_start_session(client, NULL, &error);
if (!cs) {
MONGOC_ERROR("Could not start session: %s", error.message);
return;
}
r = run_transaction_with_retry(update_employee_info, cs, &error);
if (!r) {
MONGOC_ERROR("Could not update employee, permanent error: %s", error.message);
}
mongoc_client_session_destroy(cs);
}C++11
g transaction_func = std::function<void(client_session & session)>;
run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
while (true) {
try {
txn_func(session); // performs transaction.
break;
} catch (operation_exception const& oe) {
std::cout << "Transaction aborted. Caught exception during transaction." << std::endl;
// If transient error, retry the whole transaction.
if (oe.has_error_label("TransientTransactionError")) {
std::cout << "TransientTransactionError, retrying transaction ..." << std::endl;
continue;
} else {
throw oe;
}
}
}
commit_with_retry = [](client_session& session) {
while (true) {
try {
session.commit_transaction(); // Uses write concern set at transaction start.
std::cout << "Transaction committed." << std::endl;
break;
} catch (operation_exception const& oe) {
// Can retry commit
if (oe.has_error_label("UnknownTransactionCommitResult")) {
std::cout << "UnknownTransactionCommitResult, retrying commit operation ..." << std::endl;
continue;
} else {
std::cout << "Error during commit ..." << std::endl;
throw oe;
}
}
}
pdates two collections in a transaction
update_employee_info = [&](client_session& session) {
auto& client = session.client();
auto employees = client["hr"]["employees"];
auto events = client["reporting"]["events"];
options::transaction txn_opts;
read_concern rc;
rc.acknowledge_level(read_concern::level::k_snapshot);
txn_opts.read_concern(rc);
write_concern wc;
wc.acknowledge_level(write_concern::level::k_majority);
txn_opts.write_concern(wc);
session.start_transaction(txn_opts);
try {
employees.update_one(
make_document(kvp("employee", 3)),
make_document(kvp("$set", make_document(kvp("status", "Inactive")))));
events.insert_one(make_document(
kvp("employee", 3), kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active")))));
} catch (operation_exception const& oe) {
std::cout << "Caught exception during transaction, aborting." << std::endl;
session.abort_transaction();
throw oe;
}
commit_with_retry(session);
session = client.start_session();
{
run_transaction_with_retry(update_employee_info, session);
tch (operation_exception const& oe) {
// Do something with error.
throw oe;C#
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
{
while (true)
{
try
{
txnFunc(client, session); // performs transaction
break;
}
catch (MongoException exception)
{
// if transient error, retry the whole transaction
if (exception.HasErrorLabel("TransientTransactionError"))
{
Console.WriteLine("TransientTransactionError, retrying transaction.");
continue;
}
else
{
throw;
}
}
}
}
public void CommitWithRetry(IClientSessionHandle session)
{
while (true)
{
try
{
session.CommitTransaction();
Console.WriteLine("Transaction committed.");
break;
}
catch (MongoException exception)
{
// can retry commit
if (exception.HasErrorLabel("UnknownTransactionCommitResult"))
{
Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation");
continue;
}
else
{
Console.WriteLine($"Error during commit: {exception.Message}.");
throw;
}
}
}
}
// updates two collections in a transaction
public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session)
{
var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees");
var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events");
session.StartTransaction(new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority));
try
{
employeesCollection.UpdateOne(
session,
Builders<BsonDocument>.Filter.Eq("employee", 3),
Builders<BsonDocument>.Update.Set("status", "Inactive"));
eventsCollection.InsertOne(
session,
new BsonDocument
{
{ "employee", 3 },
{ "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } }
});
}
catch (Exception exception)
{
Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}.");
session.AbortTransaction();
throw;
}
CommitWithRetry(session);
}
public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client)
{
// start a session
using (var session = client.StartSession())
{
try
{
RunTransactionWithRetry(UpdateEmployeeInfo, client, session);
}
catch (Exception exception)
{
// do something with error
Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}.");
}
}
}Go
runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error {
for {
err := txnFn(ctx) // Performs transaction.
if err == nil {
return nil
}
log.Println("Transaction aborted. Caught exception during transaction.")
// If transient error, retry the whole transaction
if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
log.Println("TransientTransactionError, retrying transaction...")
continue
}
return err
}
}
commitWithRetry := func(ctx context.Context) error {
sess := mongo.SessionFromContext(ctx)
for {
err := sess.CommitTransaction(ctx)
switch e := err.(type) {
case nil:
log.Println("Transaction committed.")
return nil
case mongo.CommandError:
// Can retry commit
if e.HasErrorLabel("UnknownTransactionCommitResult") {
log.Println("UnknownTransactionCommitResult, retrying commit operation...")
continue
}
log.Println("Error during commit...")
return e
default:
log.Println("Error during commit...")
return e
}
}
}
// Updates two collections in a transaction.
updateEmployeeInfo := func(ctx context.Context) error {
employees := client.Database("hr").Collection("employees")
events := client.Database("reporting").Collection("events")
sess := mongo.SessionFromContext(ctx)
err := sess.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.Majority()),
)
if err != nil {
return err
}
_, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}})
if err != nil {
sess.AbortTransaction(ctx)
log.Println("caught exception during transaction, aborting.")
return err
}
_, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}})
if err != nil {
sess.AbortTransaction(ctx)
log.Println("caught exception during transaction, aborting.")
return err
}
return commitWithRetry(ctx)
}
txnOpts := options.Transaction().SetReadPreference(readpref.Primary())
return client.UseSessionWithOptions(
ctx, options.Session().SetDefaultTransactionOptions(txnOpts),
func(ctx context.Context) error {
return runTransactionWithRetry(ctx, updateEmployeeInfo)
},
)
}Java(Sync)
Important
To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.
void runTransactionWithRetry(Runnable transactional) {
while (true) {
try {
transactional.run();
break;
} catch (MongoException e) {
System.out.println("Transaction aborted. Caught exception during transaction.");
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
System.out.println("TransientTransactionError, aborting transaction and retrying ...");
continue;
} else {
throw e;
}
}
}
}
void commitWithRetry(ClientSession clientSession) {
while (true) {
try {
clientSession.commitTransaction();
System.out.println("Transaction committed");
break;
} catch (MongoException e) {
// can retry commit
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
continue;
} else {
System.out.println("Exception during commit ...");
throw e;
}
}
}
}
void updateEmployeeInfo() {
MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees");
MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events");
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
try (ClientSession clientSession = client.startSession()) {
clientSession.startTransaction(txnOptions);
employeesCollection.updateOne(clientSession,
Filters.eq("employee", 3),
Updates.set("status", "Inactive"));
eventsCollection.insertOne(clientSession,
new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active")));
commitWithRetry(clientSession);
}
}
void updateEmployeeInfoWithRetry() {
runTransactionWithRetry(this::updateEmployeeInfo);
}Motor
Note
For Motor, see the Callback API instead.
Node.js
Important
To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.要将读写操作与事务相关联,您必须将会话传递给事务中的每个操作。
async function commitWithRetry(session) {
try {
await session.commitTransaction();
console.log('Transaction committed.');
} catch (error) {
if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
console.log('UnknownTransactionCommitResult, retrying commit operation ...');
await commitWithRetry(session);
} else {
console.log('Error during commit ...');
throw error;
}
}
}
async function runTransactionWithRetry(txnFunc, client, session) {
try {
await txnFunc(client, session);
} catch (error) {
console.log('Transaction aborted. Caught exception during transaction.');
// If transient error, retry the whole transaction如果出现暂时性错误,请重试整个事务
if (error.hasErrorLabel('TransientTransactionError')) {
console.log('TransientTransactionError, retrying transaction ...');
await runTransactionWithRetry(txnFunc, client, session);
} else {
throw error;
}
}
}
async function updateEmployeeInfo(client, session) {
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' },
readPreference: 'primary'
});
const employeesCollection = client.db('hr').collection('employees');
const eventsCollection = client.db('reporting').collection('events');
await employeesCollection.updateOne(
{ employee: 3 },
{ $set: { status: 'Inactive' } },
{ session }
);
await eventsCollection.insertOne(
{
employee: 3,
status: { new: 'Inactive', old: 'Active' }
},
{ session }
);
try {
await commitWithRetry(session);
} catch (error) {
await session.abortTransaction();
throw error;
}
}
return client.withSession(session =>
runTransactionWithRetry(updateEmployeeInfo, client, session)
);Perl
Important
To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.要将读写操作与事务相关联,您必须将会话传递给事务中的每个操作。
sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();PHP
Important
To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.
private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session): void
{
while (true) {
try {
$txnFunc($client, $session); // performs transaction
break;
} catch (\MongoDB\Driver\Exception\CommandException $error) {
$resultDoc = $error->getResultDocument();
// If transient error, retry the whole transaction
if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
continue;
} else {
throw $error;
}
} catch (\MongoDB\Driver\Exception\Exception $error) {
throw $error;
}
}
}
private function commitWithRetry3(\MongoDB\Driver\Session $session): void
{
while (true) {
try {
$session->commitTransaction();
echo "Transaction committed.\n";
break;
} catch (\MongoDB\Driver\Exception\CommandException $error) {
$resultDoc = $error->getResultDocument();
if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
continue;
} else {
echo "Error during commit ...\n";
throw $error;
}
} catch (\MongoDB\Driver\Exception\Exception $error) {
echo "Error during commit ...\n";
throw $error;
}
}
}
private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session): void
{
$session->startTransaction([
'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'),
'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::PRIMARY),
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY),
]);
try {
$client->hr->employees->updateOne(
['employee' => 3],
['$set' => ['status' => 'Inactive']],
['session' => $session],
);
$client->reporting->events->insertOne(
['employee' => 3, 'status' => ['new' => 'Inactive', 'old' => 'Active']],
['session' => $session],
);
} catch (\MongoDB\Driver\Exception\Exception $error) {
echo "Caught exception during transaction, aborting.\n";
$session->abortTransaction();
throw $error;
}
$this->commitWithRetry3($session);
}
private function doUpdateEmployeeInfo(\MongoDB\Client $client): void
{
// Start a session.
$session = $client->startSession();
try {
$this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session);
} catch (\MongoDB\Driver\Exception\Exception) {
// Do something with error
}
}Python
Important
To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.
def run_transaction_with_retry(txn_func, session):
while True:
try:
txn_func(session) # performs transaction
break
except (ConnectionFailure, OperationFailure) as exc:
# If transient error, retry the whole transaction
if exc.has_error_label("TransientTransactionError"):
print("TransientTransactionError, retrying transaction ...")
continue
else:
raise
def commit_with_retry(session):
while True:
try:
# Commit uses write concern set at transaction start.
session.commit_transaction()
print("Transaction committed.")
break
except (ConnectionFailure, OperationFailure) as exc:
# Can retry commit
if exc.has_error_label("UnknownTransactionCommitResult"):
print("UnknownTransactionCommitResult, retrying commit operation ...")
continue
else:
print("Error during commit ...")
raise
# Updates two collections in a transactions
def update_employee_info(session):
employees_coll = session.client.hr.employees
events_coll = session.client.reporting.events
with session.start_transaction(
read_concern=ReadConcern("snapshot"),
write_concern=WriteConcern(w="majority"),
read_preference=ReadPreference.PRIMARY,
):
employees_coll.update_one(
{"employee": 3}, {"$set": {"status": "Inactive"}}, session=session
)
events_coll.insert_one(
{"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session
)
commit_with_retry(session)
# Start a session.
with client.start_session() as session:
try:
run_transaction_with_retry(update_employee_info, session)
except Exception:
# Do something with error.
raiseRuby
Important
To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.
def run_transaction_with_retry(session)
begin
yield session # performs transaction
rescue Mongo::Error => e
puts 'Transaction aborted. Caught exception during transaction.'
raise unless e.label?('TransientTransactionError')
puts "TransientTransactionError, retrying transaction ..."
retry
end
end
def commit_with_retry(session)
begin
session.commit_transaction
puts 'Transaction committed.'
rescue Mongo::Error => e
if e.label?('UnknownTransactionCommitResult')
puts "UnknownTransactionCommitResult, retrying commit operation ..."
retry
else
puts 'Error during commit ...'
raise
end
end
end
# updates two collections in a transaction
def update_employee_info(session)
employees_coll = session.client.use(:hr)[:employees]
events_coll = session.client.use(:reporting)[:events]
session.start_transaction(read_concern: { level: :snapshot },
write_concern: { w: :majority },
read: {mode: :primary})
employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} },
session: session)
events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } },
session: session)
commit_with_retry(session)
end
session = client.start_session
begin
run_transaction_with_retry(session) do
update_employee_info(session)
end
rescue StandardError => e
# Do something with error
raise
endscala
Important
To associate read and write operations with a transaction, you must pass the session to each operation in the transaction.
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}Driver Versions驱动程序版本
Transaction Error Handling事务错误处理
Regardless of the database system, whether MongoDB or relational databases, applications should take measures to handle errors during transaction commits and incorporate retry logic for transactions.无论数据库系统是MongoDB还是关系数据库,应用程序都应该采取措施处理事务提交过程中的错误,并为事务引入重试逻辑。
TransientTransactionError
The individual write operations inside the transaction are not retryable, regardless of the value of 无论retryWrites. retryWrites的值如何,事务中的单个写入操作都是不可重试的。If an operation encounters an error associated with the label 如果操作遇到与标签"TransientTransactionError", such as when the primary steps down, the transaction as a whole can be retried."TransientTransactionError"相关的错误,例如当主操作停止时,可以重试整个事务。
The callback API incorporates retry logic for回调API包含"TransientTransactionError"."TransientTransactionError"的重试逻辑。The core transaction API does not incorporate retry logic for核心事务API未包含"TransientTransactionError". To handle"TransientTransactionError", applications should explicitly incorporate retry logic for the error."TransientTransactionError"的重试逻辑。为了处理"TransientTransactionError",应用程序应该明确地包含错误的重试逻辑。To view an example that incorporates retry logic for transient errors, see Core API Example.要查看包含针对瞬态错误的重试逻辑的示例,请参阅核心API示例。
UnknownTransactionCommitResult
Commit operations are retryable write operations. 提交操作是可重试的写入操作。If the commit operation encounters an error, MongoDB drivers retry the commit regardless of the value of 如果提交操作遇到错误,MongoDB驱动程序将重试提交,而不管retryWrites.retryWrites的值是多少。
If the commit operation encounters an error labeled 如果提交操作遇到标记为"UnknownTransactionCommitResult", the commit can be retried."UnknownTransactionCommitResult"的错误,则可以重试提交。
The callback API incorporates retry logic for回调API包含"UnknownTransactionCommitResult"."UnknownTransactionCommitResult"的重试逻辑。The core transaction API does not incorporate retry logic for核心事务API未包含"UnknownTransactionCommitResult"."UnknownTransactionCommitResult"的重试逻辑。To handle为了处理"UnknownTransactionCommitResult", applications should explicitly incorporate retry logic for the error."UnknownTransactionCommitResult",应用程序应该明确地包含错误的重试逻辑。To view an example that incorporates retry logic for unknown commit errors, see Core API Example.要查看包含未知提交错误的重试逻辑的示例,请参阅核心API示例。
TransactionTooLargeForCache
New in version 6.2.在版本6.2中新增。
Starting in MongoDB 6.2, the server does not retry the transaction if it receives a 从MongoDB 6.2开始,如果服务器收到TransactionTooLargeForCache error. This error means the cache is too small and a retry is likely to fail.TransactionTooLargeForCache错误,则不会重试事务。此错误表示缓存太小,重试可能会失败。
The default value for the 事务transactionTooLargeForCacheThreshold threshold is 0.75. transactionTooLargeForCacheThreshold阈值的默认值为0.75。The server returns 当事务使用超过75%的缓存时,服务器返回TransactionTooLargeForCache instead of retrying the transaction when the transaction uses more than 75% of the cache.TransactionTooLargeForCache,而不是重试事务。
In earlier versions of MongoDB, the server returns 在早期版本的MongoDB中,服务器返回TemporarilyUnavailable or WriteConflict instead of TransactionTooLargeForCache.TemporaryUnavailable或WriteConflict,而不是TransactionTooLargeForCache。
Use the 使用setParameter command to modify the error threshold.setParameter命令修改错误阈值。
Additional Information附加信息
mongosh Example
The following 以下mongosh methods are available for transactions:mongosh方法可用于事务处理:
Note
The 为了简单起见,mongosh example omits retry logic and robust error handling for simplicity's sake. For a more practical example of incorporating transactions in applications, see Transaction Error Handling instead.mongosh示例省略了重试逻辑和健壮的错误处理。有关在应用程序中合并事务的更实际的示例,请参阅事务错误处理。
// Create collections:
db.getSiblingDB("mydb1").foo.insertOne(
{abc: 0},
{ writeConcern: { w: "majority", wtimeout: 2000 } }
)
db.getSiblingDB("mydb2").bar.insertOne(
{xyz: 0},
{ writeConcern: { w: "majority", wtimeout: 2000 } }
)
// Start a session.
session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );
coll1 = session.getDatabase("mydb1").foo;
coll2 = session.getDatabase("mydb2").bar;
// Start a transaction
session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } );
// Operations inside the transaction
try {
coll1.insertOne( { abc: 1 } );
coll2.insertOne( { xyz: 999 } );
} catch (error) {
// Abort transaction on error
session.abortTransaction();
throw error;
}
// Commit the transaction using write concern set at transaction start
session.commitTransaction();
session.endSession();