From e3b215d8ee160df64ebc2e16010eb097e60932d4 Mon Sep 17 00:00:00 2001 From: Bradley White <14679271+devbww@users.noreply.github.com> Date: Thu, 24 Oct 2019 12:36:42 -0400 Subject: [PATCH] feat!: replace RunTransaction() with Client::Commit(functor) (googleapis/google-cloud-cpp-spanner#975) Replace g:c:s:RunTransaction() with a new g:c:s:Client::Commit() overload. This is similar to the existing Commit(), but you specify how to create the mutations, instead of passing them directly, which is what is needed to create the rerun loop. The "mutator" callback no longer receives the target Client as an argument because many mutators don't need it. If you want a client in your callback, you will need to bind it yourself. The Transaction::ReadWriteOptions have been removed as there are no options, and being able to preallocate RW transactions pretty much requires that remains the case. Session affinity over reruns is still to be addressed. BREAKING CHANGE Fixes googleapis/google-cloud-cpp-spanner#557. --- google/cloud/spanner/client.cc | 141 +++++++++--------- google/cloud/spanner/client.h | 117 ++++++--------- google/cloud/spanner/client_test.cc | 76 +++++----- .../client_integration_test.cc | 120 +++++++-------- .../integration_tests/client_stress_test.cc | 12 +- .../rpc_failure_threshold_integration_test.cc | 5 +- .../integration_tests/throughput_benchmark.cc | 27 ++-- google/cloud/spanner/mutations.h | 2 +- google/cloud/spanner/samples/samples.cc | 39 ++--- 9 files changed, 246 insertions(+), 293 deletions(-) diff --git a/google/cloud/spanner/client.cc b/google/cloud/spanner/client.cc index b3f482fee87fa..0a9c48dde6227 100644 --- a/google/cloud/spanner/client.cc +++ b/google/cloud/spanner/client.cc @@ -135,6 +135,74 @@ StatusOr Client::ExecuteBatchDml( {std::move(transaction), std::move(statements)}); } +StatusOr Client::Commit( + std::function(Transaction)> const& mutator, + std::unique_ptr rerun_policy, + std::unique_ptr backoff_policy) { + // The status-code discriminator of TransactionRerunPolicy. + using RerunnablePolicy = internal::SafeTransactionRerun; + + for (int rerun = 0;; ++rerun) { + // TODO(#472): Make this transaction use the same session each time. + Transaction txn = MakeReadWriteTransaction(); + StatusOr mutations; +#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS + try { +#endif + mutations = mutator(txn); +#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS + } catch (...) { + auto rb_status = Rollback(txn); + if (!RerunnablePolicy::IsOk(rb_status)) { + GCP_LOG(WARNING) << "Rollback() failure in Client::Commit(): " + << rb_status.message(); + } + throw; + } +#endif + auto status = mutations.status(); + if (RerunnablePolicy::IsOk(status)) { + auto result = Commit(txn, *mutations); + status = result.status(); + if (!RerunnablePolicy::IsTransientFailure(status)) { + return result; + } + } else { + if (!RerunnablePolicy::IsTransientFailure(status)) { + auto rb_status = Rollback(txn); + if (!RerunnablePolicy::IsOk(rb_status)) { + GCP_LOG(WARNING) << "Rollback() failure in Client::Commit(): " + << rb_status.message(); + } + return status; + } + } + // A transient failure (i.e., kAborted), so consider rerunning. + if (!rerun_policy->OnFailure(status)) { + return status; // reruns exhausted + } + std::this_thread::sleep_for(backoff_policy->OnCompletion()); + } +} + +StatusOr Client::Commit( + std::function(Transaction)> const& mutator) { + auto const rerun_maximum_duration = std::chrono::minutes(10); + auto default_commit_rerun_policy = + LimitedTimeTransactionRerunPolicy(rerun_maximum_duration).clone(); + + auto const backoff_initial_delay = std::chrono::milliseconds(100); + auto const backoff_maximum_delay = std::chrono::minutes(5); + auto const backoff_scaling = 2.0; + auto default_commit_backoff_policy = + ExponentialBackoffPolicy(backoff_initial_delay, backoff_maximum_delay, + backoff_scaling) + .clone(); + + return Commit(mutator, std::move(default_commit_rerun_policy), + std::move(default_commit_backoff_policy)); +} + StatusOr Client::Commit(Transaction transaction, Mutations mutations) { return conn_->Commit({std::move(transaction), std::move(mutations)}); @@ -155,79 +223,6 @@ std::shared_ptr MakeConnection(Database const& db, return internal::MakeConnection(db, std::move(stub)); } -namespace { - -StatusOr RunTransactionImpl( - Client& client, Transaction::ReadWriteOptions const& opts, - std::function(Client, Transaction)> const& f) { - Transaction txn = MakeReadWriteTransaction(opts); - StatusOr mutations; -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - try { -#endif - mutations = f(client, txn); -#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS - } catch (...) { - auto status = client.Rollback(txn); - if (!status.ok()) { - GCP_LOG(WARNING) << "Rollback() failure in RunTransaction(): " - << status.message(); - } - throw; - } -#endif - if (!mutations) { - auto status = client.Rollback(txn); - if (!status.ok()) { - GCP_LOG(WARNING) << "Rollback() failure in RunTransaction(): " - << status.message(); - } - return mutations.status(); - } - return client.Commit(txn, *mutations); -} - -} // namespace - -namespace internal { - -StatusOr RunTransactionWithPolicies( - Client client, Transaction::ReadWriteOptions const& opts, - std::function(Client, Transaction)> const& f, - std::unique_ptr rerun_policy, - std::unique_ptr backoff_policy) { - Status last_status( - StatusCode::kFailedPrecondition, - "Retry policy should not be exhausted when retry loop starts"); - char const* reason = "Too many failures in "; - while (!rerun_policy->IsExhausted()) { - auto result = RunTransactionImpl(client, opts, f); - if (result) return result; - last_status = std::move(result).status(); - if (!rerun_policy->OnFailure(last_status)) { - if (internal::SafeTransactionRerun::IsPermanentFailure(last_status)) { - reason = "Permanent failure in "; - } - break; - } - std::this_thread::sleep_for(backoff_policy->OnCompletion()); - } - return internal::RetryLoopError(reason, __func__, last_status); -} - -std::unique_ptr DefaultRunTransactionRerunPolicy() { - return LimitedTimeTransactionRerunPolicy( - /*maximum_duration=*/std::chrono::minutes(10)) - .clone(); -} - -std::unique_ptr DefaultRunTransactionBackoffPolicy() { - return ExponentialBackoffPolicy(std::chrono::milliseconds(100), - std::chrono::minutes(5), 2.0) - .clone(); -} - -} // namespace internal } // namespace SPANNER_CLIENT_NS } // namespace spanner } // namespace cloud diff --git a/google/cloud/spanner/client.h b/google/cloud/spanner/client.h index bfb0c8a73b114..885acc23f5824 100644 --- a/google/cloud/spanner/client.h +++ b/google/cloud/spanner/client.h @@ -394,7 +394,7 @@ class Client { * * As with all read-write transactions, the results will not be visible * outside of the transaction until it is committed. For that reason, it is - * advisable to run this method with `RunTransaction`. + * advisable to run this method from a `Commit` mutator. * * @warning A returned status of OK from this function does not imply that * all the statements were executed successfully. For that, you need to @@ -413,13 +413,54 @@ class Client { /** * Commits a read-write transaction. * - * The commit might return an `ABORTED` error. This can occur at any time; - * commonly, the cause is conflicts with concurrent transactions. However, it - * can also happen for a variety of other reasons. If `Commit` returns - * `ABORTED`, the caller should re-attempt the transaction from the beginning, - * re-using the same session. + * Calls the @p mutator in the context of a new read-write transaction. + * The @p mutator can execute read/write operations using the transaction, + * and returns any additional `Mutations` to commit. * - * @warning It is an error to call `Commit` with a read-only `transaction`. + * If the @p mutator returns `StatusCode::kAborted` or the transaction commit + * results in an abort, then that transaction is rolled back and the process + * repeats (subject to @p rerun_policy and @p backoff_policy), by building a + * new transaction and re-running the @p mutator. The lock priority of the + * operation increases after each aborted transaction, meaning that the next + * attempt has a slightly better chance of success. + * + * If the @p mutator and the commit succeed, the `CommitResult` is returned. + * Otherwise the error returned by the @p mutator or the commit is returned. + * + * @param mutator the function called to create mutations + * @param rerun_policy controls for how long (or how many times) the mutator + * will be rerun after the transaction aborts. + * @param backoff_policy controls how long `Commit` waits between reruns. + */ + StatusOr Commit( + std::function(Transaction)> const& mutator, + std::unique_ptr rerun_policy, + std::unique_ptr backoff_policy); + + /** + * Commits a read-write transaction. + * + * Same as above, but uses the default rerun and backoff policies. + * + * @param mutator the function called to create mutations + */ + StatusOr Commit( + std::function(Transaction)> const& mutator); + + /** + * Commits a read-write transaction. + * + * The commit might return a `kAborted` error. This can occur at any time; + * commonly, the cause is conflicts with concurrent transactions. However, + * it can also happen for a variety of other reasons. If `Commit` returns + * `kAborted`, the caller may try to reapply the mutations within a new + * read-write transaction (which should use the same session to ensure an + * increase it lock priority). + * + * @note Prefer the previous `Commit` overloads if you want to simply reapply + * mutations after a `kAborted` error. + * + * @warning It is an error to call `Commit` with a read-only transaction. * * @param transaction The transaction to commit. * @param mutations The mutations to be executed when this transaction @@ -439,7 +480,7 @@ class Client { * that includes one or more `Read`, `ExecuteQuery`, or `ExecuteDml` requests * and ultimately decides not to commit. * - * @warning It is an error to call `Rollback` with a read-only `transaction`. + * @warning It is an error to call `Rollback` with a read-only transaction. * * @param transaction The transaction to roll back. * @@ -486,66 +527,6 @@ class Client { std::shared_ptr MakeConnection( Database const& db, ConnectionOptions const& options = ConnectionOptions()); -namespace internal { -/** - * Execute a function in the context of a read-write transaction, with - * automatic retries if the transaction commit results in an abort. - * - * The caller-provided function will be passed the `Client` argument and a - * newly created read-write `Transaction`. It should use these objects to - * issue any `Read()`s, `ExecuteQuery()`s, `ExecuteDml()`s, etc., and return the - * `Mutation`s to commit, or an error (which causes the transaction to be rolled - * back). - * - * The lock priority of the transaction increases after each prior aborted - * transaction, meaning that the next attempt has a slightly better chance - * of success than before. - * - * @param client how to contact Cloud Spanner - * @param opts options for the transaction created by this function - * @param f the function to call in the transaction. - * @param retry_policy controls for how long (or how many times) will the - * function retry the operation when there is a retryable failure. - * @param backoff_policy controls how long does the function wait between - * retries. - */ -StatusOr RunTransactionWithPolicies( - Client client, Transaction::ReadWriteOptions const& opts, - std::function(Client, Transaction)> const& f, - std::unique_ptr rerun_policy, - std::unique_ptr backoff_policy); - -/// The default rerun policy for RunTransaction() -std::unique_ptr DefaultRunTransactionRerunPolicy(); - -/// The default backoff policy for RunTransaction() -std::unique_ptr DefaultRunTransactionBackoffPolicy(); - -} // namespace internal - -/** - * Execute a function in the context of a read-write transaction, with - * automatic retries if the transaction commit results in an abort. - * - * The caller-provided function will be passed the `Client` argument and a - * newly created read-write `Transaction`. It should use these objects to - * issue any `Read()`s, `ExecuteQuery()`s, `ExecuteDml()`s, etc., and return - * the `Mutation`s to commit, or an error (which causes the transaction to be - * rolled back). - * - * The lock priority of the transaction increases after each prior aborted - * transaction, meaning that the next attempt has a slightly better chance - * of success than before. - */ -inline StatusOr RunTransaction( - Client client, Transaction::ReadWriteOptions const& opts, - std::function(Client, Transaction)> f) { - return internal::RunTransactionWithPolicies( - std::move(client), opts, std::move(f), - internal::DefaultRunTransactionRerunPolicy(), - internal::DefaultRunTransactionBackoffPolicy()); -} - } // namespace SPANNER_CLIENT_NS } // namespace spanner } // namespace cloud diff --git a/google/cloud/spanner/client_test.cc b/google/cloud/spanner/client_test.cc index 9d6ce27865149..97d835faa8ff3 100644 --- a/google/cloud/spanner/client_test.cc +++ b/google/cloud/spanner/client_test.cc @@ -394,7 +394,7 @@ TEST(ClientTest, MakeConnectionOptionalArguments) { EXPECT_NE(conn, nullptr); } -TEST(ClientTest, RunTransactionCommit) { +TEST(ClientTest, CommitMutatorSuccess) { auto timestamp = internal::TimestampFromString("2019-08-14T21:16:21.123Z"); ASSERT_STATUS_OK(timestamp); @@ -428,8 +428,9 @@ TEST(ClientTest, RunTransactionCommit) { .WillOnce(DoAll(SaveArg<0>(&actual_commit_params), Return(CommitResult{*timestamp}))); + Client client(conn); auto mutation = MakeDeleteMutation("table", KeySet::All()); - auto f = [&mutation](Client client, Transaction txn) -> StatusOr { + auto mutator = [&client, &mutation](Transaction txn) -> StatusOr { auto read = client.Read(std::move(txn), "T", KeySet::All(), {"C"}); for (auto& row : StreamOf>(read)) { if (!row) return row.status(); @@ -437,8 +438,7 @@ TEST(ClientTest, RunTransactionCommit) { return Mutations{mutation}; }; - Client client(conn); - auto result = RunTransaction(client, Transaction::ReadWriteOptions{}, f); + auto result = client.Commit(mutator); EXPECT_STATUS_OK(result); EXPECT_EQ(*timestamp, result->commit_timestamp); @@ -448,7 +448,7 @@ TEST(ClientTest, RunTransactionCommit) { EXPECT_THAT(actual_commit_params.mutations, ElementsAre(mutation)); } -TEST(ClientTest, RunTransactionRollback) { +TEST(ClientTest, CommitMutatorRollback) { auto conn = std::make_shared(); Transaction txn = MakeReadWriteTransaction(); // dummy Connection::ReadParams actual_read_params{txn, {}, {}, {}, {}}; @@ -475,8 +475,9 @@ TEST(ClientTest, RunTransactionRollback) { Return(ByMove(std::move(result_set))))); EXPECT_CALL(*conn, Rollback(_)).WillOnce(Return(Status())); + Client client(conn); auto mutation = MakeDeleteMutation("table", KeySet::All()); - auto f = [&mutation](Client client, Transaction txn) -> StatusOr { + auto mutator = [&client, &mutation](Transaction txn) -> StatusOr { auto read = client.Read(std::move(txn), "T", KeySet::All(), {"C"}); for (auto& row : read) { if (!row) return row.status(); @@ -484,8 +485,7 @@ TEST(ClientTest, RunTransactionRollback) { return Mutations{mutation}; }; - Client client(conn); - auto result = RunTransaction(client, Transaction::ReadWriteOptions{}, f); + auto result = client.Commit(mutator); EXPECT_FALSE(result.ok()); EXPECT_EQ(StatusCode::kInvalidArgument, result.status().code()); EXPECT_THAT(result.status().message(), HasSubstr("blah")); @@ -495,7 +495,7 @@ TEST(ClientTest, RunTransactionRollback) { EXPECT_THAT(actual_read_params.columns, ElementsAre("C")); } -TEST(ClientTest, RunTransactionRollbackError) { +TEST(ClientTest, CommitMutatorRollbackError) { auto conn = std::make_shared(); Transaction txn = MakeReadWriteTransaction(); // dummy Connection::ReadParams actual_read_params{txn, {}, {}, {}, {}}; @@ -523,8 +523,9 @@ TEST(ClientTest, RunTransactionRollbackError) { EXPECT_CALL(*conn, Rollback(_)) .WillOnce(Return(Status(StatusCode::kInternal, "oops"))); + Client client(conn); auto mutation = MakeDeleteMutation("table", KeySet::All()); - auto f = [&mutation](Client client, Transaction txn) -> StatusOr { + auto mutator = [&client, &mutation](Transaction txn) -> StatusOr { auto read = client.Read(std::move(txn), "T", KeySet::All(), {"C"}); for (auto& row : read) { if (!row) return row.status(); @@ -532,8 +533,7 @@ TEST(ClientTest, RunTransactionRollbackError) { return Mutations{mutation}; }; - Client client(conn); - auto result = RunTransaction(client, Transaction::ReadWriteOptions{}, f); + auto result = client.Commit(mutator); EXPECT_FALSE(result.ok()); EXPECT_EQ(StatusCode::kInvalidArgument, result.status().code()); EXPECT_THAT(result.status().message(), HasSubstr("blah")); @@ -544,7 +544,7 @@ TEST(ClientTest, RunTransactionRollbackError) { } #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS -TEST(ClientTest, RunTransactionException) { +TEST(ClientTest, CommitMutatorException) { auto conn = std::make_shared(); auto source = make_unique(); @@ -567,8 +567,9 @@ TEST(ClientTest, RunTransactionException) { EXPECT_CALL(*conn, Read(_)).WillOnce(Return(ByMove(std::move(result_set)))); EXPECT_CALL(*conn, Rollback(_)).WillOnce(Return(Status())); + Client client(conn); auto mutation = MakeDeleteMutation("table", KeySet::All()); - auto f = [&mutation](Client client, Transaction txn) -> StatusOr { + auto mutator = [&client, &mutation](Transaction txn) -> StatusOr { auto read = client.Read(std::move(txn), "T", KeySet::All(), {"C"}); for (auto& row : read) { if (!row) throw "Read() error"; @@ -577,8 +578,7 @@ TEST(ClientTest, RunTransactionException) { }; try { - Client client(conn); - auto result = RunTransaction(client, Transaction::ReadWriteOptions{}, f); + auto result = client.Commit(mutator); FAIL(); } catch (char const* e) { EXPECT_STREQ(e, "Read() error"); @@ -588,7 +588,7 @@ TEST(ClientTest, RunTransactionException) { } #endif -TEST(ClientTest, RunTransaction_RetryTransientFailures) { +TEST(ClientTest, CommitMutatorRerunTransientFailures) { auto timestamp = internal::TimestampFromString("2019-08-14T21:16:21.123Z"); ASSERT_STATUS_OK(timestamp); @@ -601,58 +601,66 @@ TEST(ClientTest, RunTransaction_RetryTransientFailures) { return CommitResult{*timestamp}; })); - auto f = [](Client const&, Transaction const&) -> StatusOr { + auto mutator = [](Transaction const&) -> StatusOr { return Mutations{MakeDeleteMutation("table", KeySet::All())}; }; Client client(conn); - auto result = RunTransaction(client, Transaction::ReadWriteOptions{}, f); + auto result = client.Commit(mutator); EXPECT_STATUS_OK(result); EXPECT_EQ(*timestamp, result->commit_timestamp); } -TEST(ClientTest, RunTransaction_TooManyFailures) { +TEST(ClientTest, CommitMutatorTooManyFailures) { + int commit_attempts = 0; + int const maximum_failures = 2; + auto conn = std::make_shared(); EXPECT_CALL(*conn, Commit(_)) - .WillRepeatedly(Invoke([](Connection::CommitParams const&) { - return Status(StatusCode::kAborted, "Aborted transaction"); - })); + .WillRepeatedly( + Invoke([&commit_attempts](Connection::CommitParams const&) { + ++commit_attempts; + return Status(StatusCode::kAborted, "Aborted transaction"); + })); - auto f = [](Client const&, Transaction const&) -> StatusOr { + auto mutator = [](Transaction const&) -> StatusOr { return Mutations{MakeDeleteMutation("table", KeySet::All())}; }; Client client(conn); - // Use a retry policy with a limited number of errors, or this will wait for a + // Use a rerun policy with a limited number of errors, or this will wait for a // long time, also change the backoff policy to sleep for very short periods, // so the unit tests run faster. - auto result = internal::RunTransactionWithPolicies( - client, Transaction::ReadWriteOptions{}, f, - LimitedErrorCountTransactionRerunPolicy(2).clone(), + auto result = client.Commit( + mutator, + LimitedErrorCountTransactionRerunPolicy(maximum_failures).clone(), ExponentialBackoffPolicy(std::chrono::microseconds(10), std::chrono::microseconds(10), 2.0) .clone()); EXPECT_EQ(StatusCode::kAborted, result.status().code()); EXPECT_THAT(result.status().message(), HasSubstr("Aborted transaction")); - EXPECT_THAT(result.status().message(), HasSubstr("Too many failures ")); + EXPECT_EQ(maximum_failures + 1, commit_attempts); // one too many } -TEST(ClientTest, RunTransaction_PermanentFailure) { +TEST(ClientTest, CommitMutatorPermanentFailure) { + int commit_attempts = 0; + auto conn = std::make_shared(); EXPECT_CALL(*conn, Commit(_)) - .WillOnce(Invoke([](Connection::CommitParams const&) { + .WillOnce(Invoke([&commit_attempts](Connection::CommitParams const&) { + ++commit_attempts; return Status(StatusCode::kPermissionDenied, "uh-oh"); })); - auto f = [](Client const&, Transaction const&) -> StatusOr { + auto mutator = [](Transaction const&) -> StatusOr { return Mutations{MakeDeleteMutation("table", KeySet::All())}; }; Client client(conn); - auto result = RunTransaction(client, Transaction::ReadWriteOptions{}, f); + auto result = client.Commit(mutator); EXPECT_EQ(StatusCode::kPermissionDenied, result.status().code()); EXPECT_THAT(result.status().message(), HasSubstr("uh-oh")); - EXPECT_THAT(result.status().message(), HasSubstr("Permanent failure ")); + EXPECT_EQ(1, commit_attempts); // no reruns } } // namespace diff --git a/google/cloud/spanner/integration_tests/client_integration_test.cc b/google/cloud/spanner/integration_tests/client_integration_test.cc index 56d8e1c494b02..a65c85bde9434 100644 --- a/google/cloud/spanner/integration_tests/client_integration_test.cc +++ b/google/cloud/spanner/integration_tests/client_integration_test.cc @@ -40,22 +40,20 @@ class ClientIntegrationTest : public ::testing::Test { } void SetUp() override { - auto commit_result = - RunTransaction(*client_, {}, [](Client const&, Transaction const&) { - return Mutations{MakeDeleteMutation("Singers", KeySet::All())}; - }); + auto commit_result = client_->Commit([](Transaction const&) { + return Mutations{MakeDeleteMutation("Singers", KeySet::All())}; + }); EXPECT_STATUS_OK(commit_result); } void InsertTwoSingers() { - auto commit_result = - RunTransaction(*client_, {}, [](Client const&, Transaction const&) { - return Mutations{InsertMutationBuilder( - "Singers", {"SingerId", "FirstName", "LastName"}) - .EmplaceRow(1, "test-fname-1", "test-lname-1") - .EmplaceRow(2, "test-fname-2", "test-lname-2") - .Build()}; - }); + auto commit_result = client_->Commit([](Transaction const&) { + return Mutations{InsertMutationBuilder( + "Singers", {"SingerId", "FirstName", "LastName"}) + .EmplaceRow(1, "test-fname-1", "test-lname-1") + .EmplaceRow(2, "test-fname-2", "test-lname-2") + .Build()}; + }); ASSERT_STATUS_OK(commit_result); } @@ -91,11 +89,10 @@ TEST_F(ClientIntegrationTest, InsertAndCommit) { TEST_F(ClientIntegrationTest, DeleteAndCommit) { ASSERT_NO_FATAL_FAILURE(InsertTwoSingers()); - auto commit_result = - RunTransaction(*client_, {}, [](Client const&, Transaction const&) { - return Mutations{ - MakeDeleteMutation("Singers", KeySet().AddKey(MakeKey(1)))}; - }); + auto commit_result = client_->Commit([](Transaction const&) { + return Mutations{ + MakeDeleteMutation("Singers", KeySet().AddKey(MakeKey(1)))}; + }); EXPECT_STATUS_OK(commit_result); auto reader = client_->Read("Singers", KeySet::All(), @@ -118,9 +115,9 @@ TEST_F(ClientIntegrationTest, DeleteAndCommit) { TEST_F(ClientIntegrationTest, MultipleInserts) { ASSERT_NO_FATAL_FAILURE(InsertTwoSingers()); - auto commit_result = RunTransaction( - *client_, {}, - [](Client client, Transaction const& txn) -> StatusOr { + auto& client = *client_; + auto commit_result = + client_->Commit([&client](Transaction const& txn) -> StatusOr { auto insert1 = client.ExecuteDml( txn, SqlStatement("INSERT INTO Singers (SingerId, FirstName, LastName) " @@ -166,7 +163,7 @@ TEST_F(ClientIntegrationTest, TransactionRollback) { using RowType = std::tuple; - // Cannot use RunTransaction in this test because we want to call Rollback + // Cannot use Commit in this test because we want to call Rollback // explicitly. This means we need to retry ABORTED calls ourselves. for (auto start = std::chrono::steady_clock::now(), deadline = start + std::chrono::minutes(1); @@ -230,10 +227,10 @@ TEST_F(ClientIntegrationTest, TransactionRollback) { RowType(2, "test-fname-2", "test-lname-2"))); } -/// @test Verify the basics of RunTransaction(). -TEST_F(ClientIntegrationTest, RunTransaction) { +/// @test Verify the basics of Commit(). +TEST_F(ClientIntegrationTest, Commit) { // Insert SingerIds 100, 102, and 199. - auto inserter = [](Client const&, Transaction const&) { + auto inserter = [](Transaction const&) { auto isb = InsertMutationBuilder("Singers", {"SingerId", "FirstName", "LastName"}) .EmplaceRow(100, "first-name-100", "last-name-100") @@ -241,17 +238,17 @@ TEST_F(ClientIntegrationTest, RunTransaction) { .EmplaceRow(199, "first-name-199", "last-name-199"); return Mutations{isb.Build()}; }; - auto insert_result = RunTransaction(*client_, {}, inserter); + auto insert_result = client_->Commit(inserter); EXPECT_STATUS_OK(insert_result); EXPECT_NE(Timestamp{}, insert_result->commit_timestamp); // Delete SingerId 102. - auto deleter = [](Client const&, Transaction const&) { + auto deleter = [](Transaction const&) { auto mutation = MakeDeleteMutation("Singers", KeySet().AddKey(MakeKey(102))); return Mutations{mutation}; }; - auto delete_result = RunTransaction(*client_, {}, deleter); + auto delete_result = client_->Commit(deleter); EXPECT_STATUS_OK(delete_result); EXPECT_LT(insert_result->commit_timestamp, delete_result->commit_timestamp); @@ -269,8 +266,9 @@ TEST_F(ClientIntegrationTest, RunTransaction) { /// @test Test various forms of ExecuteQuery() and ExecuteDml() TEST_F(ClientIntegrationTest, ExecuteQueryDml) { - auto insert_result = RunTransaction( - *client_, {}, [](Client client, Transaction txn) -> StatusOr { + auto& client = *client_; + auto insert_result = + client_->Commit([&client](Transaction txn) -> StatusOr { auto insert = client.ExecuteDml( std::move(txn), SqlStatement(R"sql( INSERT INTO Singers (SingerId, FirstName, LastName) @@ -285,10 +283,8 @@ TEST_F(ClientIntegrationTest, ExecuteQueryDml) { using RowType = std::tuple; std::vector expected_rows; - auto commit_result = RunTransaction( - *client_, {}, - [&expected_rows](Client client, - Transaction const& txn) -> StatusOr { + auto commit_result = client_->Commit( + [&client, &expected_rows](Transaction const& txn) -> StatusOr { expected_rows.clear(); for (int i = 2; i != 10; ++i) { auto s = std::to_string(i); @@ -330,8 +326,9 @@ TEST_F(ClientIntegrationTest, ExecuteQueryDml) { /// @test Test ExecutePartitionedDml TEST_F(ClientIntegrationTest, ExecutePartitionedDml) { - auto insert_result = RunTransaction( - *client_, {}, [](Client client, Transaction txn) -> StatusOr { + auto& client = *client_; + auto insert_result = + client_->Commit([&client](Transaction txn) -> StatusOr { auto insert = client.ExecuteDml( std::move(txn), SqlStatement(R"sql( INSERT INTO Singers (SingerId, FirstName, LastName) @@ -356,10 +353,8 @@ void CheckReadWithOptions( options_generator) { using RowValues = std::vector; std::vector expected_rows; - auto commit = RunTransaction( - client, Transaction::ReadWriteOptions{}, - [&expected_rows](Client const&, - Transaction const&) -> StatusOr { + auto commit = client.Commit( + [&expected_rows](Transaction const&) -> StatusOr { expected_rows.clear(); InsertMutationBuilder insert("Singers", {"SingerId", "FirstName", "LastName"}); @@ -440,10 +435,8 @@ void CheckExecuteQueryWithSingleUseOptions( options_generator) { using RowValues = std::vector; std::vector expected_rows; - auto commit = RunTransaction( - client, Transaction::ReadWriteOptions{}, - [&expected_rows](Client const&, - Transaction const&) -> StatusOr { + auto commit = client.Commit( + [&expected_rows](Transaction const&) -> StatusOr { InsertMutationBuilder insert("Singers", {"SingerId", "FirstName", "LastName"}); for (int i = 1; i != 10; ++i) { @@ -521,13 +514,10 @@ TEST_F(ClientIntegrationTest, ExecuteQuery_ExactStaleness_Duration) { }); } -StatusOr>> AddSingerDataToTable( - Client const& client) { +StatusOr>> AddSingerDataToTable(Client client) { std::vector> expected_rows; - auto commit = RunTransaction( - client, Transaction::ReadWriteOptions{}, - [&expected_rows](Client const&, - Transaction const&) -> StatusOr { + auto commit = client.Commit( + [&expected_rows](Transaction const&) -> StatusOr { expected_rows.clear(); InsertMutationBuilder insert("Singers", {"SingerId", "FirstName", "LastName"}); @@ -642,12 +632,12 @@ TEST_F(ClientIntegrationTest, ExecuteBatchDml) { "WHERE FirstName = 'Foo1' or FirstName = 'Foo3'"), }; + auto& client = *client_; StatusOr batch_result; - auto commit_result = RunTransaction( - *client_, {}, - [&batch_result, &statements](Client c, - Transaction txn) -> StatusOr { - batch_result = c.ExecuteBatchDml(std::move(txn), statements); + auto commit_result = + client_->Commit([&client, &batch_result, + &statements](Transaction txn) -> StatusOr { + batch_result = client.ExecuteBatchDml(std::move(txn), statements); if (!batch_result) return batch_result.status(); if (!batch_result->status.ok()) return batch_result->status; return Mutations{}; @@ -705,17 +695,17 @@ TEST_F(ClientIntegrationTest, ExecuteBatchDmlMany) { std::vector left(v.begin(), v.begin() + v.size() / 2); std::vector right(v.begin() + v.size() / 2, v.end()); + auto& client = *client_; StatusOr batch_result_left; StatusOr batch_result_right; - auto commit_result = RunTransaction( - *client_, {}, - [&batch_result_left, &batch_result_right, &left, &right]( - Client c, Transaction txn) -> StatusOr { - batch_result_left = c.ExecuteBatchDml(txn, left); + auto commit_result = + client_->Commit([&client, &batch_result_left, &batch_result_right, &left, + &right](Transaction txn) -> StatusOr { + batch_result_left = client.ExecuteBatchDml(txn, left); if (!batch_result_left) return batch_result_left.status(); if (!batch_result_left->status.ok()) return batch_result_left->status; - batch_result_right = c.ExecuteBatchDml(std::move(txn), right); + batch_result_right = client.ExecuteBatchDml(std::move(txn), right); if (!batch_result_right) return batch_result_right.status(); if (!batch_result_right->status.ok()) return batch_result_right->status; @@ -768,12 +758,12 @@ TEST_F(ClientIntegrationTest, ExecuteBatchDmlFailure) { "WHERE FirstName = 'Foo1' or FirstName = 'Foo3'"), }; + auto& client = *client_; StatusOr batch_result; - auto commit_result = RunTransaction( - *client_, {}, - [&batch_result, &statements](Client c, - Transaction txn) -> StatusOr { - batch_result = c.ExecuteBatchDml(std::move(txn), statements); + auto commit_result = + client_->Commit([&client, &batch_result, + &statements](Transaction txn) -> StatusOr { + batch_result = client.ExecuteBatchDml(std::move(txn), statements); if (!batch_result) return batch_result.status(); if (!batch_result->status.ok()) return batch_result->status; return Mutations{}; diff --git a/google/cloud/spanner/integration_tests/client_stress_test.cc b/google/cloud/spanner/integration_tests/client_stress_test.cc index e1a7f16d2b683..8e4782a3098c6 100644 --- a/google/cloud/spanner/integration_tests/client_stress_test.cc +++ b/google/cloud/spanner/integration_tests/client_stress_test.cc @@ -85,10 +85,8 @@ TEST(ClientSqlStressTest, UpsertAndSelect) { auto action = static_cast(random_action(generator)); if (action == kInsert) { - auto commit = spanner::RunTransaction( - client, {}, - [key](Client const&, - Transaction const&) -> StatusOr { + auto commit = client.Commit( + [key](Transaction const&) -> StatusOr { auto s = std::to_string(key); return Mutations{spanner::MakeInsertOrUpdateMutation( "Singers", {"SingerId", "FirstName", "LastName"}, key, @@ -154,10 +152,8 @@ TEST(ClientStressTest, UpsertAndRead) { auto action = static_cast(random_action(generator)); if (action == kInsert) { - auto commit = spanner::RunTransaction( - client, {}, - [key](Client const&, - Transaction const&) -> StatusOr { + auto commit = client.Commit( + [key](Transaction const&) -> StatusOr { auto s = std::to_string(key); return Mutations{spanner::MakeInsertOrUpdateMutation( "Singers", {"SingerId", "FirstName", "LastName"}, key, diff --git a/google/cloud/spanner/integration_tests/rpc_failure_threshold_integration_test.cc b/google/cloud/spanner/integration_tests/rpc_failure_threshold_integration_test.cc index 2721a3b55dfa0..bc9328f1f727e 100644 --- a/google/cloud/spanner/integration_tests/rpc_failure_threshold_integration_test.cc +++ b/google/cloud/spanner/integration_tests/rpc_failure_threshold_integration_test.cc @@ -124,9 +124,8 @@ Result RunExperiment(Database const& db, int iterations) { int const report = iterations / 5; for (int i = 0; i != iterations; ++i) { if (i % report == 0) std::cout << '.' << std::flush; - auto delete_status = RunTransaction( - client, Transaction::ReadWriteOptions{}, - [&](Client client, Transaction const& txn) -> StatusOr { + auto delete_status = + client.Commit([&client](Transaction const& txn) -> StatusOr { auto status = client.ExecuteDml( txn, SqlStatement("DELETE FROM Singers WHERE true")); if (!status) return std::move(status).status(); diff --git a/google/cloud/spanner/integration_tests/throughput_benchmark.cc b/google/cloud/spanner/integration_tests/throughput_benchmark.cc index 01f6c9e7c9f00..f4425bd6b7611 100644 --- a/google/cloud/spanner/integration_tests/throughput_benchmark.cc +++ b/google/cloud/spanner/integration_tests/throughput_benchmark.cc @@ -127,11 +127,9 @@ class InsertSingleRow : public Experiment { "Singers", {"SingerId", "FirstName", "LastName"}, key, make_string("fname:", key), make_string("lname:", key)); auto start_txn = std::chrono::steady_clock::now(); - auto result = RunTransaction(client, {}, - [&m](cloud_spanner::Client const&, - cloud_spanner::Transaction const&) { - return cloud_spanner::Mutations{m}; - }); + auto result = client.Commit([&m](cloud_spanner::Transaction const&) { + return cloud_spanner::Mutations{m}; + }); samples.push_back( {Operation::kInsert, 1, ElapsedTime(start_txn), result.ok()}); @@ -185,12 +183,9 @@ class InsertMultipleRows : public Experiment { } auto start_txn = std::chrono::steady_clock::now(); auto m = std::move(builder).Build(); - auto result = cloud_spanner::RunTransaction( - client, {}, - [&m](cloud_spanner::Client const&, - cloud_spanner::Transaction const&) { - return cloud_spanner::Mutations{m}; - }); + auto result = client.Commit([&m](cloud_spanner::Transaction const&) { + return cloud_spanner::Mutations{m}; + }); samples.push_back( {Operation::kInsert, row_count, ElapsedTime(start_txn), result.ok()}); @@ -206,7 +201,7 @@ class InsertMultipleRows : public Experiment { /// The work of a single thread in the 'InsertSingleRow' experiment. class SelectSingleRow : public Experiment { public: - void SetUpTask(Config const& config, cloud_spanner::Client const& client, + void SetUpTask(Config const& config, cloud_spanner::Client client, int task_count, int task_id) { auto generator = google::cloud::internal::MakeDefaultPRNG(); std::uniform_int_distribution random_key(0, @@ -229,11 +224,9 @@ class SelectSingleRow : public Experiment { auto m = cloud_spanner::MakeInsertOrUpdateMutation( "Singers", {"SingerId", "FirstName", "LastName"}, key, make_string("fname:", key), make_string("lname:", key)); - (void)RunTransaction(client, {}, - [&m](cloud_spanner::Client const&, - cloud_spanner::Transaction const&) { - return cloud_spanner::Mutations{m}; - }); + (void)client.Commit([&m](cloud_spanner::Transaction const&) { + return cloud_spanner::Mutations{m}; + }); } } diff --git a/google/cloud/spanner/mutations.h b/google/cloud/spanner/mutations.h index 917d754f2572f..9ab21197f3ae5 100644 --- a/google/cloud/spanner/mutations.h +++ b/google/cloud/spanner/mutations.h @@ -88,7 +88,7 @@ class Mutation { /** * An ordered sequence of mutations to pass to `Client::Commit()` or return - * from the `Client::RunTransaction()` functor. + * from the `Client::Commit()` mutator. */ using Mutations = std::vector; diff --git a/google/cloud/spanner/samples/samples.cc b/google/cloud/spanner/samples/samples.cc index 2e9df0bd6f3c8..827d0cf42be29 100644 --- a/google/cloud/spanner/samples/samples.cc +++ b/google/cloud/spanner/samples/samples.cc @@ -822,7 +822,7 @@ void ReadWriteTransaction(google::cloud::spanner::Client client) { {"MarketingBudget"}); for (auto row : spanner::StreamOf>(read)) { // Return the error (as opposed to throwing an exception) because - // RunTransaction() only retries on StatusCode::kAborted. + // Commit() only retries on StatusCode::kAborted. if (!row) return std::move(row).status(); // We expect at most one result from the `Read()` request. Return // the first one. @@ -833,10 +833,8 @@ void ReadWriteTransaction(google::cloud::spanner::Client client) { "," + std::to_string(album_id) + ")"); }; - auto commit = spanner::RunTransaction( - std::move(client), {}, - [&get_current_budget](spanner::Client const& client, - spanner::Transaction const& txn) + auto commit = client.Commit( + [&client, &get_current_budget](spanner::Transaction const& txn) -> google::cloud::StatusOr { auto b1 = get_current_budget(client, txn, 1, 1); if (!b1) return std::move(b1).status(); @@ -861,10 +859,8 @@ void ReadWriteTransaction(google::cloud::spanner::Client client) { void DmlStandardInsert(google::cloud::spanner::Client client) { using google::cloud::StatusOr; namespace spanner = google::cloud::spanner; - auto commit_result = spanner::RunTransaction( - std::move(client), spanner::Transaction::ReadWriteOptions{}, - [](spanner::Client client, - spanner::Transaction txn) -> StatusOr { + auto commit_result = client.Commit( + [&client](spanner::Transaction txn) -> StatusOr { auto insert = client.ExecuteDml( std::move(txn), spanner::SqlStatement( @@ -885,10 +881,8 @@ void DmlStandardInsert(google::cloud::spanner::Client client) { void DmlStandardUpdate(google::cloud::spanner::Client client) { using google::cloud::StatusOr; namespace spanner = google::cloud::spanner; - auto commit_result = spanner::RunTransaction( - std::move(client), spanner::Transaction::ReadWriteOptions{}, - [](spanner::Client client, - spanner::Transaction txn) -> StatusOr { + auto commit_result = client.Commit( + [&client](spanner::Transaction txn) -> StatusOr { auto update = client.ExecuteDml( std::move(txn), spanner::SqlStatement( @@ -909,17 +903,14 @@ void DmlStandardUpdate(google::cloud::spanner::Client client) { void DmlStandardDelete(google::cloud::spanner::Client client) { using google::cloud::StatusOr; namespace spanner = google::cloud::spanner; - auto commit_result = spanner::RunTransaction( - std::move(client), spanner::Transaction::ReadWriteOptions{}, - [](spanner::Client client, - spanner::Transaction txn) -> StatusOr { - auto dele = client.ExecuteDml( - std::move(txn), - spanner::SqlStatement( - "DELETE FROM Singers WHERE FirstName = 'Alice'")); - if (!dele) return dele.status(); - return spanner::Mutations{}; - }); + auto commit_result = client.Commit([&client](spanner::Transaction txn) + -> StatusOr { + auto dele = client.ExecuteDml( + std::move(txn), + spanner::SqlStatement("DELETE FROM Singers WHERE FirstName = 'Alice'")); + if (!dele) return dele.status(); + return spanner::Mutations{}; + }); if (!commit_result) { throw std::runtime_error(commit_result.status().message()); }