Skip to content

Commit

Permalink
chore(spanner): fix mutation only case in rw mux with aborted error (#…
Browse files Browse the repository at this point in the history
…3571)

* chore(Spanner): fix mutation only case ibn rw using mux with aborted errors

* chore(Spanner): fix mockspanner

* chore(spanner): update logic for mutations only

* chore(spanner): lint fix

* chore(spanner): update logic to handle begin txn retry only for aborted case

* chore(spanner): add span for abort
  • Loading branch information
harshachinta authored Jan 30, 2025
1 parent 68031f1 commit f4560e5
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,19 @@ private void createTxnAsync(
}
res.set(null);
} catch (ExecutionException e) {
SpannerException spannerException = SpannerExceptionFactory.asSpannerException(e);
if (spannerException.getErrorCode() == ErrorCode.ABORTED
&& session.getIsMultiplexed()
&& mutation != null) {
// Begin transaction can return ABORTED errors. This can only happen if it included
// a mutation key, which again means that this is a mutation-only transaction on a
// multiplexed session.
span.addAnnotation(
"Transaction Creation Failed with ABORT. Retrying",
e.getCause() == null ? e : e.getCause());
createTxnAsync(res, mutation);
return;
}
span.addAnnotation(
"Transaction Creation Failed", e.getCause() == null ? e : e.getCause());
res.setException(e.getCause() == null ? e : e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1919,7 +1919,8 @@ private Transaction beginTransaction(
}
if (session.getMultiplexed()
&& options.getModeCase() == ModeCase.READ_WRITE
&& mutationKey != null) {
&& mutationKey != null
&& mutationKey != com.google.spanner.v1.Mutation.getDefaultInstance()) {
// Mutation only case in a read-write transaction.
builder.setPrecommitToken(getTransactionPrecommitToken(transactionId));
}
Expand Down Expand Up @@ -2023,6 +2024,14 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
return;
}
sessionLastUsed.put(session.getName(), Instant.now());
if (session.getMultiplexed()
&& !request.hasPrecommitToken()
&& !request.hasSingleUseTransaction()) {
throw Status.INVALID_ARGUMENT
.withDescription(
"A Commit request for a read-write transaction on a multiplexed session must specify a precommit token.")
.asRuntimeException();
}
try {
commitExecutionTime.simulateExecutionTime(exceptions, stickyGlobalExceptions, freezeLock);
// Find or start a transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,204 @@ public void testMutationOnlyUsingAsyncTransactionManager() {
request.getPrecommitToken().getPrecommitToken());
}

private Spanner setupSpannerForAbortedBeginTransactionTests() {
// Force the BeginTransaction RPC to return Aborted the first time it is called. The exception
// is cleared after the first call, so the retry should succeed.
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofException(
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));

return SpannerOptions.newBuilder()
.setProjectId("test-project")
.setChannelProvider(channelProvider)
.setCredentials(NoCredentials.getInstance())
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setUseMultiplexedSession(true)
.setUseMultiplexedSessionForRW(true)
.setSkipVerifyingBeginTransactionForMuxRW(true)
.build())
.build()
.getService();
}

private void verifyMutationKeySetInBeginTransactionRequests(
List<BeginTransactionRequest> beginTransactionRequests) {
assertEquals(2, beginTransactionRequests.size());
// Verify the requests are executed using multiplexed sessions
for (BeginTransactionRequest request : beginTransactionRequests) {
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
assertTrue(request.hasMutationKey());
assertTrue(request.getMutationKey().hasInsert());
}
}

private void verifyPreCommitTokenSetInCommitRequest(List<CommitRequest> commitRequests) {
assertEquals(1L, commitRequests.size());
for (CommitRequest request : commitRequests) {
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
assertNotNull(request.getPrecommitToken());
assertEquals(
ByteString.copyFromUtf8("TransactionPrecommitToken"),
request.getPrecommitToken().getPrecommitToken());
}
}

// The following 4 tests validate mutation-only cases where the BeginTransaction RPC fails with an
// ABORTED or retryable error
@Test
public void testMutationOnlyCaseAbortedDuringBeginTransaction() {
// This test ensures that when a transaction containing only mutations is retried after an
// ABORT error in the BeginTransaction RPC:
// 1. The mutation key is correctly included in the BeginTransaction request.
// 2. The precommit token is properly set in the Commit request.
Spanner spanner = setupSpannerForAbortedBeginTransactionTests();
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));

client
.readWriteTransaction()
.run(
transaction -> {
Mutation mutation =
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
transaction.buffer(mutation);
return null;
});

// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
List<BeginTransactionRequest> beginTransactionRequests =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests);

// Verify that the latest precommit token is set in the CommitRequest
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
verifyPreCommitTokenSetInCommitRequest(commitRequests);

spanner.close();
}

@Test
public void testMutationOnlyUsingTransactionManagerAbortedDuringBeginTransaction() {
// This test ensures that when a transaction containing only mutations is retried after an
// ABORT error in the BeginTransaction RPC:
// 1. The mutation key is correctly included in the BeginTransaction request.
// 2. The precommit token is properly set in the Commit request.
Spanner spanner = setupSpannerForAbortedBeginTransactionTests();
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));

try (TransactionManager manager = client.transactionManager()) {
TransactionContext transaction = manager.begin();
while (true) {
try {
Mutation mutation =
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
transaction.buffer(mutation);
manager.commit();
assertNotNull(manager.getCommitTimestamp());
break;
} catch (AbortedException e) {
transaction = manager.resetForRetry();
}
}
}

// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
List<BeginTransactionRequest> beginTransactionRequests =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests);

// Verify that the latest precommit token is set in the CommitRequest
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
verifyPreCommitTokenSetInCommitRequest(commitRequests);

spanner.close();
}

@Test
public void testMutationOnlyUsingAsyncRunnerAbortedDuringBeginTransaction() {
// This test ensures that when a transaction containing only mutations is retried after an
// ABORT error in the BeginTransaction RPC:
// 1. The mutation key is correctly included in the BeginTransaction request.
// 2. The precommit token is properly set in the Commit request.

Spanner spanner = setupSpannerForAbortedBeginTransactionTests();
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));

AsyncRunner runner = client.runAsync();
get(
runner.runAsync(
txn -> {
txn.buffer(
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build());
return ApiFutures.immediateFuture(null);
},
MoreExecutors.directExecutor()));

// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
List<BeginTransactionRequest> beginTransactionRequests =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests);

// Verify that the latest precommit token is set in the CommitRequest
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
verifyPreCommitTokenSetInCommitRequest(commitRequests);

spanner.close();
}

@Test
public void testMutationOnlyUsingTransactionManagerAsyncAbortedDuringBeginTransaction()
throws Exception {
// This test verifies that in the case of mutations-only, when a transaction is retried after an
// ABORT in BeginTransaction RPC, the mutation key is correctly set in the BeginTransaction
// request
// and precommit token is set in Commit request.
Spanner spanner = setupSpannerForAbortedBeginTransactionTests();
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));

try (AsyncTransactionManager manager = client.transactionManagerAsync()) {
TransactionContextFuture transaction = manager.beginAsync();
while (true) {
CommitTimestampFuture commitTimestamp =
transaction
.then(
(txn, input) -> {
txn.buffer(
Mutation.newInsertBuilder("FOO")
.set("ID")
.to(1L)
.set("NAME")
.to("Bar")
.build());
return ApiFutures.immediateFuture(null);
},
MoreExecutors.directExecutor())
.commitAsync();
try {
assertThat(commitTimestamp.get()).isNotNull();
break;
} catch (AbortedException e) {
transaction = manager.resetForRetryAsync();
}
}
}

// Verify that for mutation only case, a mutation key is set in BeginTransactionRequest.
List<BeginTransactionRequest> beginTransactionRequests =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
verifyMutationKeySetInBeginTransactionRequests(beginTransactionRequests);

// Verify that the latest precommit token is set in the CommitRequest
List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
verifyPreCommitTokenSetInCommitRequest(commitRequests);

spanner.close();
}

// Tests the behavior of the server-side kill switch for read-write multiplexed sessions..
@Test
public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToRegularSession() {
Expand Down

0 comments on commit f4560e5

Please sign in to comment.