Skip to content

Commit

Permalink
feat!: replace RunTransaction() with Client::Commit(functor) (googlea…
Browse files Browse the repository at this point in the history
…pis/google-cloud-cpp-spanner#975)

Replace g:c:s:RunTransaction() with a new g:c:s:Client::Commit()
overload.  This is similar to the existing Commit(), but you
specify how to create the mutations, instead of passing them
directly, which is what is needed to create the rerun loop.

The "mutator" callback no longer receives the target Client as
an argument because many mutators don't need it.  If you want
a client in your callback, you will need to bind it yourself.

The Transaction::ReadWriteOptions have been removed as there
are no options, and being able to preallocate RW transactions
pretty much requires that remains the case.

Session affinity over reruns is still to be addressed.

BREAKING CHANGE

Fixes googleapis/google-cloud-cpp-spanner#557.
  • Loading branch information
devbww authored Oct 24, 2019
1 parent 15eea0f commit e3b215d
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 293 deletions.
141 changes: 68 additions & 73 deletions google/cloud/spanner/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,74 @@ StatusOr<BatchDmlResult> Client::ExecuteBatchDml(
{std::move(transaction), std::move(statements)});
}

StatusOr<CommitResult> Client::Commit(
std::function<StatusOr<Mutations>(Transaction)> const& mutator,
std::unique_ptr<TransactionRerunPolicy> rerun_policy,
std::unique_ptr<BackoffPolicy> backoff_policy) {
// The status-code discriminator of TransactionRerunPolicy.
using RerunnablePolicy = internal::SafeTransactionRerun;

for (int rerun = 0;; ++rerun) {
// TODO(#472): Make this transaction use the same session each time.
Transaction txn = MakeReadWriteTransaction();
StatusOr<Mutations> mutations;
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
try {
#endif
mutations = mutator(txn);
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
} catch (...) {
auto rb_status = Rollback(txn);
if (!RerunnablePolicy::IsOk(rb_status)) {
GCP_LOG(WARNING) << "Rollback() failure in Client::Commit(): "
<< rb_status.message();
}
throw;
}
#endif
auto status = mutations.status();
if (RerunnablePolicy::IsOk(status)) {
auto result = Commit(txn, *mutations);
status = result.status();
if (!RerunnablePolicy::IsTransientFailure(status)) {
return result;
}
} else {
if (!RerunnablePolicy::IsTransientFailure(status)) {
auto rb_status = Rollback(txn);
if (!RerunnablePolicy::IsOk(rb_status)) {
GCP_LOG(WARNING) << "Rollback() failure in Client::Commit(): "
<< rb_status.message();
}
return status;
}
}
// A transient failure (i.e., kAborted), so consider rerunning.
if (!rerun_policy->OnFailure(status)) {
return status; // reruns exhausted
}
std::this_thread::sleep_for(backoff_policy->OnCompletion());
}
}

StatusOr<CommitResult> Client::Commit(
std::function<StatusOr<Mutations>(Transaction)> const& mutator) {
auto const rerun_maximum_duration = std::chrono::minutes(10);
auto default_commit_rerun_policy =
LimitedTimeTransactionRerunPolicy(rerun_maximum_duration).clone();

auto const backoff_initial_delay = std::chrono::milliseconds(100);
auto const backoff_maximum_delay = std::chrono::minutes(5);
auto const backoff_scaling = 2.0;
auto default_commit_backoff_policy =
ExponentialBackoffPolicy(backoff_initial_delay, backoff_maximum_delay,
backoff_scaling)
.clone();

return Commit(mutator, std::move(default_commit_rerun_policy),
std::move(default_commit_backoff_policy));
}

StatusOr<CommitResult> Client::Commit(Transaction transaction,
Mutations mutations) {
return conn_->Commit({std::move(transaction), std::move(mutations)});
Expand All @@ -155,79 +223,6 @@ std::shared_ptr<Connection> MakeConnection(Database const& db,
return internal::MakeConnection(db, std::move(stub));
}

namespace {

StatusOr<CommitResult> RunTransactionImpl(
Client& client, Transaction::ReadWriteOptions const& opts,
std::function<StatusOr<Mutations>(Client, Transaction)> const& f) {
Transaction txn = MakeReadWriteTransaction(opts);
StatusOr<Mutations> mutations;
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
try {
#endif
mutations = f(client, txn);
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
} catch (...) {
auto status = client.Rollback(txn);
if (!status.ok()) {
GCP_LOG(WARNING) << "Rollback() failure in RunTransaction(): "
<< status.message();
}
throw;
}
#endif
if (!mutations) {
auto status = client.Rollback(txn);
if (!status.ok()) {
GCP_LOG(WARNING) << "Rollback() failure in RunTransaction(): "
<< status.message();
}
return mutations.status();
}
return client.Commit(txn, *mutations);
}

} // namespace

namespace internal {

StatusOr<CommitResult> RunTransactionWithPolicies(
Client client, Transaction::ReadWriteOptions const& opts,
std::function<StatusOr<Mutations>(Client, Transaction)> const& f,
std::unique_ptr<TransactionRerunPolicy> rerun_policy,
std::unique_ptr<BackoffPolicy> backoff_policy) {
Status last_status(
StatusCode::kFailedPrecondition,
"Retry policy should not be exhausted when retry loop starts");
char const* reason = "Too many failures in ";
while (!rerun_policy->IsExhausted()) {
auto result = RunTransactionImpl(client, opts, f);
if (result) return result;
last_status = std::move(result).status();
if (!rerun_policy->OnFailure(last_status)) {
if (internal::SafeTransactionRerun::IsPermanentFailure(last_status)) {
reason = "Permanent failure in ";
}
break;
}
std::this_thread::sleep_for(backoff_policy->OnCompletion());
}
return internal::RetryLoopError(reason, __func__, last_status);
}

std::unique_ptr<TransactionRerunPolicy> DefaultRunTransactionRerunPolicy() {
return LimitedTimeTransactionRerunPolicy(
/*maximum_duration=*/std::chrono::minutes(10))
.clone();
}

std::unique_ptr<BackoffPolicy> DefaultRunTransactionBackoffPolicy() {
return ExponentialBackoffPolicy(std::chrono::milliseconds(100),
std::chrono::minutes(5), 2.0)
.clone();
}

} // namespace internal
} // namespace SPANNER_CLIENT_NS
} // namespace spanner
} // namespace cloud
Expand Down
117 changes: 49 additions & 68 deletions google/cloud/spanner/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ class Client {
*
* As with all read-write transactions, the results will not be visible
* outside of the transaction until it is committed. For that reason, it is
* advisable to run this method with `RunTransaction`.
* advisable to run this method from a `Commit` mutator.
*
* @warning A returned status of OK from this function does not imply that
* all the statements were executed successfully. For that, you need to
Expand All @@ -413,13 +413,54 @@ class Client {
/**
* Commits a read-write transaction.
*
* The commit might return an `ABORTED` error. This can occur at any time;
* commonly, the cause is conflicts with concurrent transactions. However, it
* can also happen for a variety of other reasons. If `Commit` returns
* `ABORTED`, the caller should re-attempt the transaction from the beginning,
* re-using the same session.
* Calls the @p mutator in the context of a new read-write transaction.
* The @p mutator can execute read/write operations using the transaction,
* and returns any additional `Mutations` to commit.
*
* @warning It is an error to call `Commit` with a read-only `transaction`.
* If the @p mutator returns `StatusCode::kAborted` or the transaction commit
* results in an abort, then that transaction is rolled back and the process
* repeats (subject to @p rerun_policy and @p backoff_policy), by building a
* new transaction and re-running the @p mutator. The lock priority of the
* operation increases after each aborted transaction, meaning that the next
* attempt has a slightly better chance of success.
*
* If the @p mutator and the commit succeed, the `CommitResult` is returned.
* Otherwise the error returned by the @p mutator or the commit is returned.
*
* @param mutator the function called to create mutations
* @param rerun_policy controls for how long (or how many times) the mutator
* will be rerun after the transaction aborts.
* @param backoff_policy controls how long `Commit` waits between reruns.
*/
StatusOr<CommitResult> Commit(
std::function<StatusOr<Mutations>(Transaction)> const& mutator,
std::unique_ptr<TransactionRerunPolicy> rerun_policy,
std::unique_ptr<BackoffPolicy> backoff_policy);

/**
* Commits a read-write transaction.
*
* Same as above, but uses the default rerun and backoff policies.
*
* @param mutator the function called to create mutations
*/
StatusOr<CommitResult> Commit(
std::function<StatusOr<Mutations>(Transaction)> const& mutator);

/**
* Commits a read-write transaction.
*
* The commit might return a `kAborted` error. This can occur at any time;
* commonly, the cause is conflicts with concurrent transactions. However,
* it can also happen for a variety of other reasons. If `Commit` returns
* `kAborted`, the caller may try to reapply the mutations within a new
* read-write transaction (which should use the same session to ensure an
* increase it lock priority).
*
* @note Prefer the previous `Commit` overloads if you want to simply reapply
* mutations after a `kAborted` error.
*
* @warning It is an error to call `Commit` with a read-only transaction.
*
* @param transaction The transaction to commit.
* @param mutations The mutations to be executed when this transaction
Expand All @@ -439,7 +480,7 @@ class Client {
* that includes one or more `Read`, `ExecuteQuery`, or `ExecuteDml` requests
* and ultimately decides not to commit.
*
* @warning It is an error to call `Rollback` with a read-only `transaction`.
* @warning It is an error to call `Rollback` with a read-only transaction.
*
* @param transaction The transaction to roll back.
*
Expand Down Expand Up @@ -486,66 +527,6 @@ class Client {
std::shared_ptr<Connection> MakeConnection(
Database const& db, ConnectionOptions const& options = ConnectionOptions());

namespace internal {
/**
* Execute a function in the context of a read-write transaction, with
* automatic retries if the transaction commit results in an abort.
*
* The caller-provided function will be passed the `Client` argument and a
* newly created read-write `Transaction`. It should use these objects to
* issue any `Read()`s, `ExecuteQuery()`s, `ExecuteDml()`s, etc., and return the
* `Mutation`s to commit, or an error (which causes the transaction to be rolled
* back).
*
* The lock priority of the transaction increases after each prior aborted
* transaction, meaning that the next attempt has a slightly better chance
* of success than before.
*
* @param client how to contact Cloud Spanner
* @param opts options for the transaction created by this function
* @param f the function to call in the transaction.
* @param retry_policy controls for how long (or how many times) will the
* function retry the operation when there is a retryable failure.
* @param backoff_policy controls how long does the function wait between
* retries.
*/
StatusOr<CommitResult> RunTransactionWithPolicies(
Client client, Transaction::ReadWriteOptions const& opts,
std::function<StatusOr<Mutations>(Client, Transaction)> const& f,
std::unique_ptr<TransactionRerunPolicy> rerun_policy,
std::unique_ptr<BackoffPolicy> backoff_policy);

/// The default rerun policy for RunTransaction()
std::unique_ptr<TransactionRerunPolicy> DefaultRunTransactionRerunPolicy();

/// The default backoff policy for RunTransaction()
std::unique_ptr<BackoffPolicy> DefaultRunTransactionBackoffPolicy();

} // namespace internal

/**
* Execute a function in the context of a read-write transaction, with
* automatic retries if the transaction commit results in an abort.
*
* The caller-provided function will be passed the `Client` argument and a
* newly created read-write `Transaction`. It should use these objects to
* issue any `Read()`s, `ExecuteQuery()`s, `ExecuteDml()`s, etc., and return
* the `Mutation`s to commit, or an error (which causes the transaction to be
* rolled back).
*
* The lock priority of the transaction increases after each prior aborted
* transaction, meaning that the next attempt has a slightly better chance
* of success than before.
*/
inline StatusOr<CommitResult> RunTransaction(
Client client, Transaction::ReadWriteOptions const& opts,
std::function<StatusOr<Mutations>(Client, Transaction)> f) {
return internal::RunTransactionWithPolicies(
std::move(client), opts, std::move(f),
internal::DefaultRunTransactionRerunPolicy(),
internal::DefaultRunTransactionBackoffPolicy());
}

} // namespace SPANNER_CLIENT_NS
} // namespace spanner
} // namespace cloud
Expand Down
Loading

0 comments on commit e3b215d

Please sign in to comment.