From edc5bbf0d9d4faf48fd9a8d479d5bc5de938c82d Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Mon, 6 Feb 2023 13:29:38 +0530 Subject: [PATCH 01/25] fix: prevent illegal negative timeout values into thread sleep() method while retrying exceptions in unit tests. * For details on issue see - https://github.com/googleapis/java-spanner/issues/2206 --- .../com/google/cloud/spanner/it/ITClosedSessionTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java index aeb0256285b..227611a10de 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java @@ -251,7 +251,10 @@ public void testTransactionManager() throws InterruptedException { break; } } catch (AbortedException e) { - Thread.sleep(e.getRetryDelayInMillis()); + long retryDelayInMillis = e.getRetryDelayInMillis(); + if(retryDelayInMillis > 0) { + Thread.sleep(retryDelayInMillis); + } txn = manager.resetForRetry(); } } From 4cd497b05eab3e3b6b89b582bfafde80d42c1518 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Wed, 8 Feb 2023 15:27:18 +0530 Subject: [PATCH 02/25] Fixing lint issues. --- .../java/com/google/cloud/spanner/it/ITClosedSessionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java index 227611a10de..efbffcfa899 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java @@ -252,7 +252,7 @@ public void testTransactionManager() throws InterruptedException { } } catch (AbortedException e) { long retryDelayInMillis = e.getRetryDelayInMillis(); - if(retryDelayInMillis > 0) { + if (retryDelayInMillis > 0) { Thread.sleep(retryDelayInMillis); } txn = manager.resetForRetry(); From 922f324b58ada9963905c466156abba08945e54e Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Tue, 17 Oct 2023 19:29:50 +0530 Subject: [PATCH 03/25] refactor: move session lastUseTime parameter from PooledSession to SessionImpl class. Fix updation of the parameter for chained RPCs within one transaction. --- .../java/com/google/cloud/spanner/SessionImpl.java | 12 ++++++++++++ .../java/com/google/cloud/spanner/SessionPool.java | 11 +++++------ .../google/cloud/spanner/BaseSessionPoolTest.java | 1 + 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 002a00134f6..dd5337fe79b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -29,6 +29,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionId; +import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Ticker; @@ -53,6 +54,7 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; +import org.threeten.bp.Instant; /** * Implementation of {@link Session}. Sessions are managed internally by the client library, and @@ -98,12 +100,14 @@ interface SessionTransaction { ByteString readyTransactionId; private final Map options; private Span currentSpan; + private volatile Instant lastUseTime; SessionImpl(SpannerImpl spanner, String name, Map options) { this.spanner = spanner; this.options = options; this.name = checkNotNull(name); this.databaseId = SessionId.of(name).getDatabaseId(); + this.lastUseTime = new Clock().instant(); } @Override @@ -123,6 +127,14 @@ Span getCurrentSpan() { return currentSpan; } + Instant getLastUseTime() { + return lastUseTime; + } + + void markUsed(Instant instant) { + lastUseTime = instant; + } + @Override public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { setActive(null); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index ca61da80583..82008bb69e1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1370,7 +1370,6 @@ PooledSession get(final boolean eligibleForLongRunning) { class PooledSession implements Session { @VisibleForTesting SessionImpl delegate; - private volatile Instant lastUseTime; private volatile SpannerException lastException; private volatile boolean allowReplacing = true; @@ -1409,7 +1408,6 @@ class PooledSession implements Session { private PooledSession(SessionImpl delegate) { this.delegate = delegate; this.state = SessionState.AVAILABLE; - this.lastUseTime = clock.instant(); } int getChannel() { @@ -1631,7 +1629,7 @@ private void markClosing() { } void markUsed() { - lastUseTime = clock.instant(); + delegate.markUsed(clock.instant()); } @Override @@ -1827,7 +1825,7 @@ private void removeIdleSessions(Instant currTime) { Iterator iterator = sessions.descendingIterator(); while (iterator.hasNext()) { PooledSession session = iterator.next(); - if (session.lastUseTime.isBefore(minLastUseTime)) { + if (session.delegate.getLastUseTime().isBefore(minLastUseTime)) { if (session.state != SessionState.CLOSING) { boolean isRemoved = removeFromPool(session); if (isRemoved) { @@ -1929,7 +1927,8 @@ private void removeLongRunningSessions( // collection is populated only when the get() method in {@code PooledSessionFuture} is // called. final PooledSession session = sessionFuture.get(); - final Duration durationFromLastUse = Duration.between(session.lastUseTime, currentTime); + final Duration durationFromLastUse = Duration.between( + session.delegate.getLastUseTime(), currentTime); if (!session.eligibleForLongRunning && durationFromLastUse.compareTo( inactiveTransactionRemovalOptions.getIdleTimeThreshold()) @@ -2327,7 +2326,7 @@ private PooledSession findSessionToKeepAlive( && (numChecked + numAlreadyChecked) < (options.getMinSessions() + options.getMaxIdleSessions() - numSessionsInUse)) { PooledSession session = iterator.next(); - if (session.lastUseTime.isBefore(keepAliveThreshold)) { + if (session.delegate.getLastUseTime().isBefore(keepAliveThreshold)) { iterator.remove(); return session; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index 7f8cf5cc1b0..e2207aa5d4f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -80,6 +80,7 @@ SessionImpl mockSession() { .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); when(session.writeAtLeastOnceWithOptions(any(Iterable.class))) .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); + when(session.getLastUseTime()).thenReturn(Instant.now()); sessionIndex++; return session; } From b544080164348ffa010767d2a1d47385fdca3362 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Thu, 19 Oct 2023 18:27:03 +0530 Subject: [PATCH 04/25] chore: add clock instances in callees of SessionImpl. --- .../cloud/spanner/AbstractReadContext.java | 12 +++++++ .../com/google/cloud/spanner/SessionImpl.java | 3 +- .../com/google/cloud/spanner/SessionPool.java | 35 ++++++++++++++++--- .../SessionPoolAsyncTransactionManager.java | 5 +-- .../cloud/spanner/BaseSessionPoolTest.java | 19 +++++++++- .../google/cloud/spanner/SessionPoolTest.java | 10 +++--- 6 files changed, 70 insertions(+), 14 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index d5b1abe0b5b..59a5dfd6078 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -53,6 +53,7 @@ import com.google.spanner.v1.TransactionSelector; import io.opencensus.trace.Span; import io.opencensus.trace.Tracing; +import java.time.Clock; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -73,6 +74,8 @@ abstract static class Builder, T extends AbstractReadCon private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS; private ExecutorProvider executorProvider; + private Clock clock; + Builder() {} @SuppressWarnings("unchecked") @@ -110,6 +113,11 @@ B setExecutorProvider(ExecutorProvider executorProvider) { return self(); } + B setClock(Clock clock) { + this.clock = clock; + return self(); + } + abstract T build(); } @@ -392,6 +400,8 @@ void initTransaction() { private final int defaultPrefetchChunks; private final QueryOptions defaultQueryOptions; + private final Clock clock; + @GuardedBy("lock") private boolean isValid = true; @@ -416,6 +426,8 @@ void initTransaction() { this.defaultQueryOptions = builder.defaultQueryOptions; this.span = builder.span; this.executorProvider = builder.executorProvider; + this.clock = builder.clock; + } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index dd5337fe79b..15961fc8189 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -29,7 +29,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionId; -import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Ticker; @@ -107,7 +106,7 @@ interface SessionTransaction { this.options = options; this.name = checkNotNull(name); this.databaseId = SessionId.of(name).getDatabaseId(); - this.lastUseTime = new Clock().instant(); + this.lastUseTime = Instant.now(); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 82008bb69e1..e906635eb54 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -632,10 +632,28 @@ public CursorState tryNext() { static class SessionPoolTransactionContext implements TransactionContext { private final SessionNotFoundHandler handler; final TransactionContext delegate; + private final SessionImpl sessionImpl; + private final Clock clock; - SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate) { + SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate, + SessionImpl sessionImpl) { + this(handler, delegate, sessionImpl, null); + } + + /** + * Constructor accepts a {@link Clock} instance which can be mocked within tests. + * @param handler + * @param delegate + * @param sessionImpl + * @param clock + */ + @VisibleForTesting + SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate, + SessionImpl sessionImpl, Clock clock) { this.handler = Preconditions.checkNotNull(handler); this.delegate = delegate; + this.sessionImpl = sessionImpl; + this.clock = clock == null? new Clock() : clock; } @Override @@ -743,6 +761,8 @@ public ResultSet analyzeUpdateStatement( return delegate.analyzeUpdateStatement(statement, analyzeMode, options); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); + } finally { + sessionImpl.markUsed(clock.instant()); } } @@ -752,6 +772,8 @@ public long executeUpdate(Statement statement, UpdateOption... options) { return delegate.executeUpdate(statement, options); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); + } finally { + sessionImpl.markUsed(clock.instant()); } } @@ -772,6 +794,8 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option return delegate.batchUpdate(statements, options); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); + } finally { + sessionImpl.markUsed(clock.instant()); } } @@ -833,7 +857,8 @@ public TransactionContext begin() { } private TransactionContext internalBegin() { - TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin()); + TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin(), + session.get().delegate); session.get().markUsed(); return res; } @@ -883,11 +908,13 @@ public TransactionContext resetForRetry() { while (true) { try { if (restartedAfterSessionNotFound) { - TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin()); + TransactionContext res = new SessionPoolTransactionContext(this, + delegate.begin(), session.get().delegate); restartedAfterSessionNotFound = false; return res; } else { - return new SessionPoolTransactionContext(this, delegate.resetForRetry()); + return new SessionPoolTransactionContext(this, delegate.resetForRetry(), + session.get().delegate); } } catch (SessionNotFoundException e) { session = sessionPool.replaceSession(e, session); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index b6442fd2182..271c811c4bd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -149,7 +149,8 @@ public void onFailure(Throwable t) { public void onSuccess(TransactionContext result) { delegateTxnFuture.set( new SessionPool.SessionPoolTransactionContext( - SessionPoolAsyncTransactionManager.this, result)); + SessionPoolAsyncTransactionManager.this, result, + session.get().delegate)); } }, MoreExecutors.directExecutor()); @@ -253,7 +254,7 @@ public TransactionContextFuture resetForRetryAsync() { MoreExecutors.directExecutor()), input -> new SessionPool.SessionPoolTransactionContext( - SessionPoolAsyncTransactionManager.this, input), + SessionPoolAsyncTransactionManager.this, input, session.get().delegate), MoreExecutors.directExecutor())); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index e2207aa5d4f..8833ae4350b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -80,7 +80,24 @@ SessionImpl mockSession() { .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); when(session.writeAtLeastOnceWithOptions(any(Iterable.class))) .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); - when(session.getLastUseTime()).thenReturn(Instant.now()); + sessionIndex++; + return session; + } + + SessionImpl mockSession(Clock clock) { + final SessionImpl session = mock(SessionImpl.class); + Map options = new HashMap<>(); + options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement()); + when(session.getOptions()).thenReturn(options); + when(session.getName()) + .thenReturn( + "projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex); + when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); + when(session.writeWithOptions(any(Iterable.class))) + .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); + when(session.writeAtLeastOnceWithOptions(any(Iterable.class))) + .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); + when(session.getLastUseTime()).thenReturn(clock.instant()); sessionIndex++; return session; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index b8cade68cc0..b517b5933ad 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -641,6 +641,8 @@ public void failOnPoolExhaustion() { @Test public void idleSessionCleanup() throws Exception { + FakeClock clock = new FakeClock(); + clock.currentTimeMillis = System.currentTimeMillis(); options = SessionPoolOptions.newBuilder() .setMinSessions(1) @@ -648,9 +650,9 @@ public void idleSessionCleanup() throws Exception { .setIncStep(1) .setMaxIdleSessions(0) .build(); - SessionImpl session1 = mockSession(); - SessionImpl session2 = mockSession(); - SessionImpl session3 = mockSession(); + SessionImpl session1 = mockSession(clock); + SessionImpl session2 = mockSession(clock); + SessionImpl session3 = mockSession(clock); final LinkedList sessions = new LinkedList<>(Arrays.asList(session1, session2, session3)); doAnswer( @@ -668,8 +670,6 @@ public void idleSessionCleanup() throws Exception { for (SessionImpl session : sessions) { mockKeepAlive(session); } - FakeClock clock = new FakeClock(); - clock.currentTimeMillis = System.currentTimeMillis(); pool = createPool(clock); // Make sure pool has been initialized pool.getSession().close(); From 98622651369f7a7c5ac1a5b515ca758daedbe1bb Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Thu, 26 Oct 2023 19:05:39 +0530 Subject: [PATCH 05/25] chore: partially fix failing unit tests in SessionPoolTest and SessionPoolMaintainerTest. --- .../com/google/cloud/spanner/SessionPool.java | 3 ++ .../cloud/spanner/BaseSessionPoolTest.java | 46 +++++++++++----- .../spanner/SessionPoolMaintainerTest.java | 7 ++- .../cloud/spanner/SessionPoolStressTest.java | 3 +- .../google/cloud/spanner/SessionPoolTest.java | 54 +++++++++++-------- 5 files changed, 75 insertions(+), 38 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index e906635eb54..a2d119b8a37 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1435,6 +1435,9 @@ class PooledSession implements Session { private PooledSession(SessionImpl delegate) { this.delegate = delegate; this.state = SessionState.AVAILABLE; + + // initialise the lastUseTime field for each session. + this.markUsed(); } int getChannel() { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index 8833ae4350b..0a73c4e8f45 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -23,11 +23,14 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.protobuf.Empty; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -35,6 +38,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.threeten.bp.Instant; abstract class BaseSessionPoolTest { @@ -84,20 +89,37 @@ SessionImpl mockSession() { return session; } - SessionImpl mockSession(Clock clock) { - final SessionImpl session = mock(SessionImpl.class); + SessionImpl mockSession(ReadContext context) { + SpannerImpl spanner = mock(SpannerImpl.class); Map options = new HashMap<>(); options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement()); - when(session.getOptions()).thenReturn(options); - when(session.getName()) - .thenReturn( - "projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex); - when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); - when(session.writeWithOptions(any(Iterable.class))) - .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); - when(session.writeAtLeastOnceWithOptions(any(Iterable.class))) - .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); - when(session.getLastUseTime()).thenReturn(clock.instant()); + final SessionImpl session = + new SessionImpl( + spanner, "projects/dummy/instances/dummy/databases/dummy/sessions/session" + + sessionIndex, options) { + @Override + public ReadContext singleUse(TimestampBound bound) { + // The below stubs are added so that we can mock keep-alive. + return context; + } + + @Override + public ApiFuture asyncClose() { + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + + @Override + public CommitResponse writeAtLeastOnceWithOptions(Iterable mutations, + TransactionOption... transactionOptions) throws SpannerException { + return new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance()); + } + + @Override + public CommitResponse writeWithOptions(Iterable mutations, + TransactionOption... options) throws SpannerException { + return new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance()); + } + }; sessionIndex++; return session; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java index 2f7b14fdadf..55095629a8d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -76,6 +76,7 @@ public void setUp() { } private void setupMockSessionCreation() { + ReadContext mockContext = mock(ReadContext.class); doAnswer( invocation -> { executor.submit( @@ -84,7 +85,7 @@ private void setupMockSessionCreation() { SessionConsumerImpl consumer = invocation.getArgument(2, SessionConsumerImpl.class); for (int i = 0; i < sessionCount; i++) { - consumer.onSessionReady(setupMockSession(mockSession())); + consumer.onSessionReady(setupMockSession(mockSession(mockContext), mockContext)); } }); return null; @@ -94,10 +95,8 @@ private void setupMockSessionCreation() { Mockito.anyInt(), Mockito.anyBoolean(), any(SessionConsumer.class)); } - private SessionImpl setupMockSession(final SessionImpl session) { - ReadContext mockContext = mock(ReadContext.class); + private SessionImpl setupMockSession(final SessionImpl session, final ReadContext mockContext) { final ResultSet mockResult = mock(ResultSet.class); - when(session.singleUse(any(TimestampBound.class))).thenReturn(mockContext); when(mockContext.executeQuery(any(Statement.class))) .thenAnswer( invocation -> { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index c9ba9b360af..3fe7855783c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -89,6 +89,7 @@ public static Collection data() { } private void setupSpanner(DatabaseId db) { + ReadContext context = mock(ReadContext.class); mockSpanner = mock(SpannerImpl.class); spannerOptions = mock(SpannerOptions.class); when(spannerOptions.getNumChannels()).thenReturn(4); @@ -105,7 +106,7 @@ private void setupSpanner(DatabaseId db) { for (int s = 0; s < sessionCount; s++) { SessionImpl session; synchronized (lock) { - session = mockSession(); + session = mockSession(context); setupSession(session); sessions.put(session.getName(), false); if (sessions.size() > maxAliveSessions) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index b517b5933ad..fc56ac0adc2 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -641,8 +641,7 @@ public void failOnPoolExhaustion() { @Test public void idleSessionCleanup() throws Exception { - FakeClock clock = new FakeClock(); - clock.currentTimeMillis = System.currentTimeMillis(); + ReadContext context = mock(ReadContext.class); options = SessionPoolOptions.newBuilder() .setMinSessions(1) @@ -650,9 +649,9 @@ public void idleSessionCleanup() throws Exception { .setIncStep(1) .setMaxIdleSessions(0) .build(); - SessionImpl session1 = mockSession(clock); - SessionImpl session2 = mockSession(clock); - SessionImpl session3 = mockSession(clock); + SessionImpl session1 = mockSession(context); + SessionImpl session2 = mockSession(context); + SessionImpl session3 = mockSession(context); final LinkedList sessions = new LinkedList<>(Arrays.asList(session1, session2, session3)); doAnswer( @@ -667,9 +666,12 @@ public void idleSessionCleanup() throws Exception { }) .when(sessionClient) .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); - for (SessionImpl session : sessions) { - mockKeepAlive(session); - } + + FakeClock clock = new FakeClock(); + clock.currentTimeMillis = System.currentTimeMillis(); + + mockKeepAlive(context); + pool = createPool(clock); // Make sure pool has been initialized pool.getSession().close(); @@ -1034,9 +1036,10 @@ public void longRunningTransactionsCleanup_whenException_doNothing() throws Exce } private void setupForLongRunningTransactionsCleanup() { - SessionImpl session1 = mockSession(); - SessionImpl session2 = mockSession(); - SessionImpl session3 = mockSession(); + ReadContext context = mock(ReadContext.class); + SessionImpl session1 = mockSession(context); + SessionImpl session2 = mockSession(context); + SessionImpl session3 = mockSession(context); final LinkedList sessions = new LinkedList<>(Arrays.asList(session1, session2, session3)); @@ -1053,16 +1056,20 @@ private void setupForLongRunningTransactionsCleanup() { .when(sessionClient) .asyncBatchCreateSessions(Mockito.eq(1), Mockito.anyBoolean(), any(SessionConsumer.class)); - for (SessionImpl session : sessions) { - mockKeepAlive(session); - } + mockKeepAlive(context); } @Test public void keepAlive() throws Exception { + ReadContext context = mock(ReadContext.class); options = SessionPoolOptions.newBuilder().setMinSessions(2).setMaxSessions(3).build(); - final SessionImpl session = mockSession(); - mockKeepAlive(session); + final SessionImpl mockSession1 = mockSession(context); + final SessionImpl mockSession2 = mockSession(context); + final SessionImpl mockSession3 = mockSession(context); + final LinkedList sessions = + new LinkedList<>(Arrays.asList(mockSession1, mockSession2, mockSession3)); + + mockKeepAlive(context); // This is cheating as we are returning the same session each but it makes the verification // easier. doAnswer( @@ -1073,7 +1080,7 @@ public void keepAlive() throws Exception { SessionConsumerImpl consumer = invocation.getArgument(2, SessionConsumerImpl.class); for (int i = 0; i < sessionCount; i++) { - consumer.onSessionReady(session); + consumer.onSessionReady(sessions.pop()); } }); return null; @@ -1090,9 +1097,9 @@ public void keepAlive() throws Exception { session1.close(); session2.close(); runMaintenanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); - verify(session, never()).singleUse(any(TimestampBound.class)); + verify(context, never()).executeQuery(any(Statement.class)); runMaintenanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); - verify(session, times(2)).singleUse(any(TimestampBound.class)); + verify(context, times(2)).executeQuery(Statement.newBuilder("SELECT 1").build()); clock.currentTimeMillis += clock.currentTimeMillis + (options.getKeepAliveIntervalMinutes() + 5) * 60 * 1000; session1 = pool.getSession(); @@ -1100,8 +1107,8 @@ public void keepAlive() throws Exception { session1.close(); runMaintenanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); // The session pool only keeps MinSessions + MaxIdleSessions alive. - verify(session, times(options.getMinSessions() + options.getMaxIdleSessions())) - .singleUse(any(TimestampBound.class)); + verify(context, times(options.getMinSessions() + options.getMaxIdleSessions())) + .executeQuery(Statement.newBuilder("SELECT 1").build()); pool.closeAsync(new SpannerImpl.ClosedException()).get(5L, TimeUnit.SECONDS); } @@ -1799,6 +1806,11 @@ public void testWaitOnMinSessionsThrowsExceptionWhenTimeoutIsReached() { pool.maybeWaitOnMinSessions(); } + private void mockKeepAlive(ReadContext context) { + ResultSet resultSet = mock(ResultSet.class); + when(resultSet.next()).thenReturn(true, false); + when(context.executeQuery(any(Statement.class))).thenReturn(resultSet); + } private void mockKeepAlive(Session session) { ReadContext context = mock(ReadContext.class); ResultSet resultSet = mock(ResultSet.class); From 5e5f7692655b5d399eebf27f53480e354d3c5ae8 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Fri, 27 Oct 2023 15:48:35 +0530 Subject: [PATCH 06/25] chore: fix failing tests in SessionPoolStressTest. --- .../cloud/spanner/SessionPoolStressTest.java | 99 +++++++++++-------- 1 file changed, 56 insertions(+), 43 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index 3fe7855783c..c86f2a38c5e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -22,13 +22,16 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.Position; import com.google.cloud.spanner.SessionPool.SessionConsumerImpl; import com.google.cloud.spanner.SessionPoolOptions.ActionOnInactiveTransaction; import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions; +import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; @@ -106,8 +109,8 @@ private void setupSpanner(DatabaseId db) { for (int s = 0; s < sessionCount; s++) { SessionImpl session; synchronized (lock) { - session = mockSession(context); - setupSession(session); + session = getMockedSession(context); + setupSession(session, context); sessions.put(session.getName(), false); if (sessions.size() > maxAliveSessions) { maxAliveSessions = sessions.size(); @@ -124,11 +127,59 @@ private void setupSpanner(DatabaseId db) { .asyncBatchCreateSessions( Mockito.anyInt(), Mockito.anyBoolean(), Mockito.any(SessionConsumer.class)); } + SessionImpl getMockedSession(ReadContext context) { + SpannerImpl spanner = mock(SpannerImpl.class); + Map options = new HashMap<>(); + options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement()); + final SessionImpl session = + new SessionImpl( + spanner, "projects/dummy/instances/dummy/databases/dummy/sessions/session" + + sessionIndex, options) { + @Override + public ReadContext singleUse(TimestampBound bound) { + // The below stubs are added so that we can mock keep-alive. + return context; + } - private void setupSession(final SessionImpl session) { - ReadContext mockContext = mock(ReadContext.class); + @Override + public ApiFuture asyncClose() { + synchronized (lock) { + if (expiredSessions.contains(this.getName())) { + return ApiFutures.immediateFailedFuture( + SpannerExceptionFactoryTest.newSessionNotFoundException(this.getName())); + } + if (sessions.remove(this.getName()) == null) { + setFailed(closedSessions.get(this.getName())); + } + closedSessions.put(this.getName(), new Exception("Session closed at:")); + if (sessions.size() < minSessionsWhenSessionClosed) { + minSessionsWhenSessionClosed = sessions.size(); + } + } + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + + @Override + public void prepareReadWriteTransaction() { + if (random.nextInt(100) < 10) { + expireSession(this); + throw SpannerExceptionFactoryTest.newSessionNotFoundException(this.getName()); + } + String name = this.getName(); + synchronized (lock) { + if (sessions.put(name, true)) { + setFailed(); + } + this.readyTransactionId = ByteString.copyFromUtf8("foo"); + } + } + }; + sessionIndex++; + return session; + } + + private void setupSession(final SessionImpl session, final ReadContext mockContext) { final ResultSet mockResult = mock(ResultSet.class); - when(session.singleUse(any(TimestampBound.class))).thenReturn(mockContext); when(mockContext.executeQuery(any(Statement.class))) .thenAnswer( invocation -> { @@ -136,44 +187,6 @@ private void setupSession(final SessionImpl session) { return mockResult; }); when(mockResult.next()).thenReturn(true); - doAnswer( - invocation -> { - synchronized (lock) { - if (expiredSessions.contains(session.getName())) { - return ApiFutures.immediateFailedFuture( - SpannerExceptionFactoryTest.newSessionNotFoundException(session.getName())); - } - if (sessions.remove(session.getName()) == null) { - setFailed(closedSessions.get(session.getName())); - } - closedSessions.put(session.getName(), new Exception("Session closed at:")); - if (sessions.size() < minSessionsWhenSessionClosed) { - minSessionsWhenSessionClosed = sessions.size(); - } - } - return ApiFutures.immediateFuture(Empty.getDefaultInstance()); - }) - .when(session) - .asyncClose(); - - doAnswer( - invocation -> { - if (random.nextInt(100) < 10) { - expireSession(session); - throw SpannerExceptionFactoryTest.newSessionNotFoundException(session.getName()); - } - String name = session.getName(); - synchronized (lock) { - if (sessions.put(name, true)) { - setFailed(); - } - session.readyTransactionId = ByteString.copyFromUtf8("foo"); - } - return null; - }) - .when(session) - .prepareReadWriteTransaction(); - when(session.hasReadyTransaction()).thenCallRealMethod(); } private void expireSession(Session session) { From a385cebc4a843399075c6309022fe4cd8dd5fad5 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Fri, 27 Oct 2023 18:58:59 +0530 Subject: [PATCH 07/25] chore: update lastUseTime for methods in SessionPoolTransactionContext. Add a couple of unit tests for testing the new behaviour. --- .../com/google/cloud/spanner/SessionPool.java | 56 +++++--- .../cloud/spanner/DatabaseClientImplTest.java | 135 ++++++++++++++++++ 2 files changed, 172 insertions(+), 19 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index a2d119b8a37..f8778225d78 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -640,14 +640,6 @@ static class SessionPoolTransactionContext implements TransactionContext { this(handler, delegate, sessionImpl, null); } - /** - * Constructor accepts a {@link Clock} instance which can be mocked within tests. - * @param handler - * @param delegate - * @param sessionImpl - * @param clock - */ - @VisibleForTesting SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate, SessionImpl sessionImpl, Clock clock) { this.handler = Preconditions.checkNotNull(handler); @@ -659,28 +651,36 @@ static class SessionPoolTransactionContext implements TransactionContext { @Override public ResultSet read( String table, KeySet keys, Iterable columns, ReadOption... options) { - return new SessionPoolResultSet(handler, delegate.read(table, keys, columns, options)); + ResultSet resultSet = new SessionPoolResultSet(handler, delegate.read(table, keys, columns, options)); + sessionImpl.markUsed(clock.instant()); + return resultSet; } @Override public AsyncResultSet readAsync( String table, KeySet keys, Iterable columns, ReadOption... options) { - return new AsyncSessionPoolResultSet( + AsyncResultSet resultSet = new AsyncSessionPoolResultSet( handler, delegate.readAsync(table, keys, columns, options)); + sessionImpl.markUsed(clock.instant()); + return resultSet; } @Override public ResultSet readUsingIndex( String table, String index, KeySet keys, Iterable columns, ReadOption... options) { - return new SessionPoolResultSet( + ResultSet resultSet = new SessionPoolResultSet( handler, delegate.readUsingIndex(table, index, keys, columns, options)); + sessionImpl.markUsed(clock.instant()); + return resultSet; } @Override public AsyncResultSet readUsingIndexAsync( String table, String index, KeySet keys, Iterable columns, ReadOption... options) { - return new AsyncSessionPoolResultSet( + AsyncResultSet resultSet = new AsyncSessionPoolResultSet( handler, delegate.readUsingIndexAsync(table, index, keys, columns, options)); + sessionImpl.markUsed(clock.instant()); + return resultSet; } @Override @@ -689,6 +689,8 @@ public Struct readRow(String table, Key key, Iterable columns) { return delegate.readRow(table, key, columns); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); + } finally { + sessionImpl.markUsed(clock.instant()); } } @@ -708,6 +710,7 @@ public ApiFuture readRowAsync(String table, Key key, Iterable co @Override public void buffer(Mutation mutation) { delegate.buffer(mutation); + sessionImpl.markUsed(clock.instant()); } @Override @@ -721,6 +724,8 @@ public Struct readRowUsingIndex(String table, String index, Key key, Iterable executeUpdateAsync(Statement statement, UpdateOption... options) { - return ApiFutures.catching( + ApiFuture apiFuture = ApiFutures.catching( delegate.executeUpdateAsync(statement, options), SessionNotFoundException.class, input -> { throw handler.handleSessionNotFound(input); }, MoreExecutors.directExecutor()); + sessionImpl.markUsed(clock.instant()); + return apiFuture; } @Override @@ -802,28 +809,39 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option @Override public ApiFuture batchUpdateAsync( Iterable statements, UpdateOption... options) { - return ApiFutures.catching( + ApiFuture apiFuture = ApiFutures.catching( delegate.batchUpdateAsync(statements, options), SessionNotFoundException.class, input -> { throw handler.handleSessionNotFound(input); }, MoreExecutors.directExecutor()); + sessionImpl.markUsed(clock.instant()); + return apiFuture; } @Override public ResultSet executeQuery(Statement statement, QueryOption... options) { - return new SessionPoolResultSet(handler, delegate.executeQuery(statement, options)); + ResultSet resultSet = + new SessionPoolResultSet(handler, delegate.executeQuery(statement, options)); + sessionImpl.markUsed(clock.instant()); + return resultSet; } @Override public AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) { - return new AsyncSessionPoolResultSet(handler, delegate.executeQueryAsync(statement, options)); + AsyncResultSet resultSet = + new AsyncSessionPoolResultSet(handler, delegate.executeQueryAsync(statement, options)); + sessionImpl.markUsed(clock.instant()); + return resultSet; } @Override public ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode queryMode) { - return new SessionPoolResultSet(handler, delegate.analyzeQuery(statement, queryMode)); + ResultSet resultSet = + new SessionPoolResultSet(handler, delegate.analyzeQuery(statement, queryMode)); + sessionImpl.markUsed(clock.instant()); + return resultSet; } @Override @@ -858,7 +876,7 @@ public TransactionContext begin() { private TransactionContext internalBegin() { TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin(), - session.get().delegate); + session.get().delegate, sessionPool.clock); session.get().markUsed(); return res; } @@ -909,7 +927,7 @@ public TransactionContext resetForRetry() { try { if (restartedAfterSessionNotFound) { TransactionContext res = new SessionPoolTransactionContext(this, - delegate.begin(), session.get().delegate); + delegate.begin(), session.get().delegate, sessionPool.clock); restartedAfterSessionNotFound = false; return res; } else { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 029ea471cdb..b461cb4afe7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -536,6 +536,141 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); } + @Test + public void + testPoolMaintainer_whenMultipleReadsExceedingLongRunningThreshold_retainSessionForTransaction() throws Exception { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 2s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + TransactionRunner runner = client.readWriteTransaction(); + runner.run( + transaction -> { + try (ResultSet resultSet = + transaction.read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.priority(RpcPriority.HIGH))) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + try (ResultSet resultSet = + transaction.read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.priority(RpcPriority.HIGH))) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + return null; + }); + + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals(endExecutionTime, initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + } + + @Test + public void + testPoolMaintainer_whenMultipleUpdatesExceedingLongRunningThreshold_retainSessionForTransaction() throws Exception { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 2s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + transaction.executeUpdate(UPDATE_STATEMENT); + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + transaction.executeUpdate(UPDATE_STATEMENT); + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals(endExecutionTime, initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + @Test public void testWrite() { DatabaseClient client = From fd3bb4123498f2ca6ec0a9dde3741f205d36a1ff Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Fri, 27 Oct 2023 19:00:52 +0530 Subject: [PATCH 08/25] chore: lint errors. --- .../cloud/spanner/AbstractReadContext.java | 1 - .../com/google/cloud/spanner/SessionPool.java | 78 +++++++++++-------- .../SessionPoolAsyncTransactionManager.java | 3 +- .../cloud/spanner/BaseSessionPoolTest.java | 17 ++-- .../cloud/spanner/DatabaseClientImplTest.java | 14 +++- .../spanner/SessionPoolMaintainerTest.java | 3 +- .../cloud/spanner/SessionPoolStressTest.java | 7 +- .../google/cloud/spanner/SessionPoolTest.java | 1 + 8 files changed, 71 insertions(+), 53 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 59a5dfd6078..af1130dafb6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -427,7 +427,6 @@ void initTransaction() { this.span = builder.span; this.executorProvider = builder.executorProvider; this.clock = builder.clock; - } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index f8778225d78..5a6efd1f073 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -635,23 +635,27 @@ static class SessionPoolTransactionContext implements TransactionContext { private final SessionImpl sessionImpl; private final Clock clock; - SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate, - SessionImpl sessionImpl) { + SessionPoolTransactionContext( + SessionNotFoundHandler handler, TransactionContext delegate, SessionImpl sessionImpl) { this(handler, delegate, sessionImpl, null); } - SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate, - SessionImpl sessionImpl, Clock clock) { + SessionPoolTransactionContext( + SessionNotFoundHandler handler, + TransactionContext delegate, + SessionImpl sessionImpl, + Clock clock) { this.handler = Preconditions.checkNotNull(handler); this.delegate = delegate; this.sessionImpl = sessionImpl; - this.clock = clock == null? new Clock() : clock; + this.clock = clock == null ? new Clock() : clock; } @Override public ResultSet read( String table, KeySet keys, Iterable columns, ReadOption... options) { - ResultSet resultSet = new SessionPoolResultSet(handler, delegate.read(table, keys, columns, options)); + ResultSet resultSet = + new SessionPoolResultSet(handler, delegate.read(table, keys, columns, options)); sessionImpl.markUsed(clock.instant()); return resultSet; } @@ -659,8 +663,8 @@ public ResultSet read( @Override public AsyncResultSet readAsync( String table, KeySet keys, Iterable columns, ReadOption... options) { - AsyncResultSet resultSet = new AsyncSessionPoolResultSet( - handler, delegate.readAsync(table, keys, columns, options)); + AsyncResultSet resultSet = + new AsyncSessionPoolResultSet(handler, delegate.readAsync(table, keys, columns, options)); sessionImpl.markUsed(clock.instant()); return resultSet; } @@ -668,8 +672,9 @@ public AsyncResultSet readAsync( @Override public ResultSet readUsingIndex( String table, String index, KeySet keys, Iterable columns, ReadOption... options) { - ResultSet resultSet = new SessionPoolResultSet( - handler, delegate.readUsingIndex(table, index, keys, columns, options)); + ResultSet resultSet = + new SessionPoolResultSet( + handler, delegate.readUsingIndex(table, index, keys, columns, options)); sessionImpl.markUsed(clock.instant()); return resultSet; } @@ -677,8 +682,9 @@ public ResultSet readUsingIndex( @Override public AsyncResultSet readUsingIndexAsync( String table, String index, KeySet keys, Iterable columns, ReadOption... options) { - AsyncResultSet resultSet = new AsyncSessionPoolResultSet( - handler, delegate.readUsingIndexAsync(table, index, keys, columns, options)); + AsyncResultSet resultSet = + new AsyncSessionPoolResultSet( + handler, delegate.readUsingIndexAsync(table, index, keys, columns, options)); sessionImpl.markUsed(clock.instant()); return resultSet; } @@ -784,13 +790,14 @@ public long executeUpdate(Statement statement, UpdateOption... options) { @Override public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... options) { - ApiFuture apiFuture = ApiFutures.catching( - delegate.executeUpdateAsync(statement, options), - SessionNotFoundException.class, - input -> { - throw handler.handleSessionNotFound(input); - }, - MoreExecutors.directExecutor()); + ApiFuture apiFuture = + ApiFutures.catching( + delegate.executeUpdateAsync(statement, options), + SessionNotFoundException.class, + input -> { + throw handler.handleSessionNotFound(input); + }, + MoreExecutors.directExecutor()); sessionImpl.markUsed(clock.instant()); return apiFuture; } @@ -809,13 +816,14 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option @Override public ApiFuture batchUpdateAsync( Iterable statements, UpdateOption... options) { - ApiFuture apiFuture = ApiFutures.catching( - delegate.batchUpdateAsync(statements, options), - SessionNotFoundException.class, - input -> { - throw handler.handleSessionNotFound(input); - }, - MoreExecutors.directExecutor()); + ApiFuture apiFuture = + ApiFutures.catching( + delegate.batchUpdateAsync(statements, options), + SessionNotFoundException.class, + input -> { + throw handler.handleSessionNotFound(input); + }, + MoreExecutors.directExecutor()); sessionImpl.markUsed(clock.instant()); return apiFuture; } @@ -875,8 +883,9 @@ public TransactionContext begin() { } private TransactionContext internalBegin() { - TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin(), - session.get().delegate, sessionPool.clock); + TransactionContext res = + new SessionPoolTransactionContext( + this, delegate.begin(), session.get().delegate, sessionPool.clock); session.get().markUsed(); return res; } @@ -926,13 +935,14 @@ public TransactionContext resetForRetry() { while (true) { try { if (restartedAfterSessionNotFound) { - TransactionContext res = new SessionPoolTransactionContext(this, - delegate.begin(), session.get().delegate, sessionPool.clock); + TransactionContext res = + new SessionPoolTransactionContext( + this, delegate.begin(), session.get().delegate, sessionPool.clock); restartedAfterSessionNotFound = false; return res; } else { - return new SessionPoolTransactionContext(this, delegate.resetForRetry(), - session.get().delegate); + return new SessionPoolTransactionContext( + this, delegate.resetForRetry(), session.get().delegate); } } catch (SessionNotFoundException e) { session = sessionPool.replaceSession(e, session); @@ -1975,8 +1985,8 @@ private void removeLongRunningSessions( // collection is populated only when the get() method in {@code PooledSessionFuture} is // called. final PooledSession session = sessionFuture.get(); - final Duration durationFromLastUse = Duration.between( - session.delegate.getLastUseTime(), currentTime); + final Duration durationFromLastUse = + Duration.between(session.delegate.getLastUseTime(), currentTime); if (!session.eligibleForLongRunning && durationFromLastUse.compareTo( inactiveTransactionRemovalOptions.getIdleTimeThreshold()) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 271c811c4bd..54157ffab1e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -149,7 +149,8 @@ public void onFailure(Throwable t) { public void onSuccess(TransactionContext result) { delegateTxnFuture.set( new SessionPool.SessionPoolTransactionContext( - SessionPoolAsyncTransactionManager.this, result, + SessionPoolAsyncTransactionManager.this, + result, session.get().delegate)); } }, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index 0a73c4e8f45..d68b3c5c4cf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -30,7 +30,6 @@ import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.protobuf.Empty; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; @@ -38,8 +37,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.threeten.bp.Instant; abstract class BaseSessionPoolTest { @@ -95,8 +92,9 @@ SessionImpl mockSession(ReadContext context) { options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement()); final SessionImpl session = new SessionImpl( - spanner, "projects/dummy/instances/dummy/databases/dummy/sessions/session" - + sessionIndex, options) { + spanner, + "projects/dummy/instances/dummy/databases/dummy/sessions/session" + sessionIndex, + options) { @Override public ReadContext singleUse(TimestampBound bound) { // The below stubs are added so that we can mock keep-alive. @@ -109,14 +107,15 @@ public ApiFuture asyncClose() { } @Override - public CommitResponse writeAtLeastOnceWithOptions(Iterable mutations, - TransactionOption... transactionOptions) throws SpannerException { + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... transactionOptions) + throws SpannerException { return new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance()); } @Override - public CommitResponse writeWithOptions(Iterable mutations, - TransactionOption... options) throws SpannerException { + public CommitResponse writeWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { return new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance()); } }; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index b461cb4afe7..37afa8a35be 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -538,7 +538,8 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi @Test public void - testPoolMaintainer_whenMultipleReadsExceedingLongRunningThreshold_retainSessionForTransaction() throws Exception { + testPoolMaintainer_whenMultipleReadsExceedingLongRunningThreshold_retainSessionForTransaction() + throws Exception { FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -603,13 +604,16 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; - assertNotEquals(endExecutionTime, initialExecutionTime); // if session clean up task runs then these timings won't match + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match assertEquals(0, client.pool.numLeakedSessionsRemoved()); } @Test public void - testPoolMaintainer_whenMultipleUpdatesExceedingLongRunningThreshold_retainSessionForTransaction() throws Exception { + testPoolMaintainer_whenMultipleUpdatesExceedingLongRunningThreshold_retainSessionForTransaction() + throws Exception { FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -666,7 +670,9 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi } Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; - assertNotEquals(endExecutionTime, initialExecutionTime); // if session clean up task runs then these timings won't match + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match assertEquals(0, client.pool.numLeakedSessionsRemoved()); assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java index 55095629a8d..2a088baa3d1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -85,7 +85,8 @@ private void setupMockSessionCreation() { SessionConsumerImpl consumer = invocation.getArgument(2, SessionConsumerImpl.class); for (int i = 0; i < sessionCount; i++) { - consumer.onSessionReady(setupMockSession(mockSession(mockContext), mockContext)); + consumer.onSessionReady( + setupMockSession(mockSession(mockContext), mockContext)); } }); return null; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index c86f2a38c5e..8a3a67b2de4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -24,7 +24,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; -import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.Position; @@ -127,14 +126,16 @@ private void setupSpanner(DatabaseId db) { .asyncBatchCreateSessions( Mockito.anyInt(), Mockito.anyBoolean(), Mockito.any(SessionConsumer.class)); } + SessionImpl getMockedSession(ReadContext context) { SpannerImpl spanner = mock(SpannerImpl.class); Map options = new HashMap<>(); options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement()); final SessionImpl session = new SessionImpl( - spanner, "projects/dummy/instances/dummy/databases/dummy/sessions/session" - + sessionIndex, options) { + spanner, + "projects/dummy/instances/dummy/databases/dummy/sessions/session" + sessionIndex, + options) { @Override public ReadContext singleUse(TimestampBound bound) { // The below stubs are added so that we can mock keep-alive. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index fc56ac0adc2..0c8b9cf2ee4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -1811,6 +1811,7 @@ private void mockKeepAlive(ReadContext context) { when(resultSet.next()).thenReturn(true, false); when(context.executeQuery(any(Statement.class))).thenReturn(resultSet); } + private void mockKeepAlive(Session session) { ReadContext context = mock(ReadContext.class); ResultSet resultSet = mock(ResultSet.class); From 486405318a647b247a0535c007b98ba87e369ce1 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Fri, 27 Oct 2023 19:08:01 +0530 Subject: [PATCH 09/25] chore: fix tests in DatabaseClientImplTest by passing the mocked clock instance. --- .../java/com/google/cloud/spanner/SessionPool.java | 11 +++-------- .../spanner/SessionPoolAsyncTransactionManager.java | 8 ++++++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 5a6efd1f073..45b0e3ac6ab 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -635,11 +635,6 @@ static class SessionPoolTransactionContext implements TransactionContext { private final SessionImpl sessionImpl; private final Clock clock; - SessionPoolTransactionContext( - SessionNotFoundHandler handler, TransactionContext delegate, SessionImpl sessionImpl) { - this(handler, delegate, sessionImpl, null); - } - SessionPoolTransactionContext( SessionNotFoundHandler handler, TransactionContext delegate, @@ -648,7 +643,7 @@ static class SessionPoolTransactionContext implements TransactionContext { this.handler = Preconditions.checkNotNull(handler); this.delegate = delegate; this.sessionImpl = sessionImpl; - this.clock = clock == null ? new Clock() : clock; + this.clock = clock; } @Override @@ -942,7 +937,7 @@ public TransactionContext resetForRetry() { return res; } else { return new SessionPoolTransactionContext( - this, delegate.resetForRetry(), session.get().delegate); + this, delegate.resetForRetry(), session.get().delegate, sessionPool.clock); } } catch (SessionNotFoundException e) { session = sessionPool.replaceSession(e, session); @@ -2064,7 +2059,7 @@ enum Position { private final ExecutorFactory executorFactory; final PoolMaintainer poolMaintainer; - private final Clock clock; + final Clock clock; /** * initialReleasePosition determines where in the pool sessions are added when they are released * into the pool the first time. This is always RANDOM in production, but some tests use FIRST to diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 54157ffab1e..f64b2348614 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -151,7 +151,8 @@ public void onSuccess(TransactionContext result) { new SessionPool.SessionPoolTransactionContext( SessionPoolAsyncTransactionManager.this, result, - session.get().delegate)); + session.get().delegate, + pool.clock)); } }, MoreExecutors.directExecutor()); @@ -255,7 +256,10 @@ public TransactionContextFuture resetForRetryAsync() { MoreExecutors.directExecutor()), input -> new SessionPool.SessionPoolTransactionContext( - SessionPoolAsyncTransactionManager.this, input, session.get().delegate), + SessionPoolAsyncTransactionManager.this, + input, + session.get().delegate, + pool.clock), MoreExecutors.directExecutor())); } From f5b82fad81e6b3c8cb2fe2c4fbaba76b28e6a6ea Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Sun, 29 Oct 2023 00:55:18 +0530 Subject: [PATCH 10/25] fix: update session lastUseTime field for AbstractReadContext class. Fix the unit test to test this change. --- .../com/google/cloud/spanner/AbstractReadContext.java | 5 +++-- .../main/java/com/google/cloud/spanner/SessionImpl.java | 3 +++ .../com/google/cloud/spanner/TransactionRunnerImpl.java | 8 ++++++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index af1130dafb6..ec86664426a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -36,6 +36,7 @@ import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; +import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; @@ -53,7 +54,6 @@ import com.google.spanner.v1.TransactionSelector; import io.opencensus.trace.Span; import io.opencensus.trace.Tracing; -import java.time.Clock; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -426,7 +426,7 @@ void initTransaction() { this.defaultQueryOptions = builder.defaultQueryOptions; this.span = builder.span; this.executorProvider = builder.executorProvider; - this.clock = builder.clock; + this.clock = builder.clock == null ? new Clock() : builder.clock; } @Override @@ -837,6 +837,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken SpannerRpc.StreamingCall call = rpc.read( builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader()); + session.markUsed(clock.instant()); call.request(prefetchChunks); stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin()); return stream; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 15961fc8189..301cf38d1cd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -29,6 +29,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionId; +import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Ticker; @@ -396,6 +397,7 @@ ApiFuture beginTransactionAsync(Options transactionOptions, boolean } TransactionContextImpl newTransaction(Options options) { + final Clock clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock(); return TransactionContextImpl.newBuilder() .setSession(this) .setOptions(options) @@ -407,6 +409,7 @@ TransactionContextImpl newTransaction(Options options) { .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) .setSpan(currentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) + .setClock(clock) .build(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index ef937e993bd..83bde8baa85 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -30,6 +30,7 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; +import com.google.cloud.spanner.SessionPool.Clock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -85,12 +86,19 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { @VisibleForTesting static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { static class Builder extends AbstractReadContext.Builder { + + private Clock clock; private ByteString transactionId; private Options options; private boolean trackTransactionStarter; private Builder() {} + Builder setClock(Clock clock) { + this.clock = clock; + return self(); + } + Builder setTransactionId(ByteString transactionId) { this.transactionId = transactionId; return self(); From 73f01924b81a128a45f58816c5381b8581403b6e Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Sun, 29 Oct 2023 17:14:03 +0530 Subject: [PATCH 11/25] fix: failing tests in TransactionRunnerImplTest. --- .../src/main/java/com/google/cloud/spanner/SessionImpl.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 301cf38d1cd..a207c74ee25 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -397,7 +397,10 @@ ApiFuture beginTransactionAsync(Options transactionOptions, boolean } TransactionContextImpl newTransaction(Options options) { - final Clock clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock(); + final Clock clock = + spanner.getOptions().getSessionPoolOptions() != null + ? spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock() + : null; return TransactionContextImpl.newBuilder() .setSession(this) .setOptions(options) From ff321784c770a69f777016c87bd159a80f7ea0a4 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Sun, 29 Oct 2023 20:34:15 +0530 Subject: [PATCH 12/25] fix: failing test in SessionPoolMaintainerTest. --- .../cloud/spanner/BaseSessionPoolTest.java | 2 +- .../spanner/SessionPoolMaintainerTest.java | 4 ++-- .../google/cloud/spanner/SessionPoolTest.java | 18 +++++++++--------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index d68b3c5c4cf..3840ee25cd2 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -86,7 +86,7 @@ SessionImpl mockSession() { return session; } - SessionImpl mockSession(ReadContext context) { + SessionImpl buildMockSession(ReadContext context) { SpannerImpl spanner = mock(SpannerImpl.class); Map options = new HashMap<>(); options.put(Option.CHANNEL_HINT, channelHint.getAndIncrement()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java index 2a088baa3d1..217e214c832 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerTest.java @@ -76,7 +76,6 @@ public void setUp() { } private void setupMockSessionCreation() { - ReadContext mockContext = mock(ReadContext.class); doAnswer( invocation -> { executor.submit( @@ -85,8 +84,9 @@ private void setupMockSessionCreation() { SessionConsumerImpl consumer = invocation.getArgument(2, SessionConsumerImpl.class); for (int i = 0; i < sessionCount; i++) { + ReadContext mockContext = mock(ReadContext.class); consumer.onSessionReady( - setupMockSession(mockSession(mockContext), mockContext)); + setupMockSession(buildMockSession(mockContext), mockContext)); } }); return null; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 0c8b9cf2ee4..ddbb5a68f70 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -649,9 +649,9 @@ public void idleSessionCleanup() throws Exception { .setIncStep(1) .setMaxIdleSessions(0) .build(); - SessionImpl session1 = mockSession(context); - SessionImpl session2 = mockSession(context); - SessionImpl session3 = mockSession(context); + SessionImpl session1 = buildMockSession(context); + SessionImpl session2 = buildMockSession(context); + SessionImpl session3 = buildMockSession(context); final LinkedList sessions = new LinkedList<>(Arrays.asList(session1, session2, session3)); doAnswer( @@ -1037,9 +1037,9 @@ public void longRunningTransactionsCleanup_whenException_doNothing() throws Exce private void setupForLongRunningTransactionsCleanup() { ReadContext context = mock(ReadContext.class); - SessionImpl session1 = mockSession(context); - SessionImpl session2 = mockSession(context); - SessionImpl session3 = mockSession(context); + SessionImpl session1 = buildMockSession(context); + SessionImpl session2 = buildMockSession(context); + SessionImpl session3 = buildMockSession(context); final LinkedList sessions = new LinkedList<>(Arrays.asList(session1, session2, session3)); @@ -1063,9 +1063,9 @@ private void setupForLongRunningTransactionsCleanup() { public void keepAlive() throws Exception { ReadContext context = mock(ReadContext.class); options = SessionPoolOptions.newBuilder().setMinSessions(2).setMaxSessions(3).build(); - final SessionImpl mockSession1 = mockSession(context); - final SessionImpl mockSession2 = mockSession(context); - final SessionImpl mockSession3 = mockSession(context); + final SessionImpl mockSession1 = buildMockSession(context); + final SessionImpl mockSession2 = buildMockSession(context); + final SessionImpl mockSession3 = buildMockSession(context); final LinkedList sessions = new LinkedList<>(Arrays.asList(mockSession1, mockSession2, mockSession3)); From 13165792c8c80bc268a09ea2c0229657897bdacd Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Sun, 29 Oct 2023 21:24:06 +0530 Subject: [PATCH 13/25] refactor: move FakeClock to a new class. --- .../cloud/spanner/BaseSessionPoolTest.java | 11 ------- .../cloud/spanner/DatabaseClientImplTest.java | 10 ------ .../com/google/cloud/spanner/FakeClock.java | 32 +++++++++++++++++++ 3 files changed, 32 insertions(+), 21 deletions(-) create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/FakeClock.java diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index 3840ee25cd2..d36d32bda2a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -27,7 +27,6 @@ import com.google.api.core.ApiFutures; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.Options.TransactionOption; -import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.protobuf.Empty; import java.util.HashMap; @@ -37,7 +36,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.threeten.bp.Instant; abstract class BaseSessionPoolTest { ScheduledExecutorService mockExecutor; @@ -129,13 +127,4 @@ void runMaintenanceLoop(FakeClock clock, SessionPool pool, long numCycles) { clock.currentTimeMillis += pool.poolMaintainer.loopFrequency; } } - - static class FakeClock extends Clock { - volatile long currentTimeMillis; - - @Override - public Instant instant() { - return Instant.ofEpochMilli(currentTimeMillis); - } - } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 37afa8a35be..1e9282dedbc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -53,7 +53,6 @@ import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; -import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPoolOptions.ActionOnInactiveTransaction; import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions; @@ -3693,13 +3692,4 @@ static void assertAsString(ImmutableList expected, ResultSet resultSet, expected.stream().collect(Collectors.joining(",", "[", "]")), resultSet.getValue(col).getAsString()); } - - static class FakeClock extends Clock { - volatile long currentTimeMillis; - - @Override - public Instant instant() { - return Instant.ofEpochMilli(currentTimeMillis); - } - } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FakeClock.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FakeClock.java new file mode 100644 index 00000000000..7a8e2cbc152 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FakeClock.java @@ -0,0 +1,32 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner; + +import com.google.cloud.spanner.SessionPool.Clock; +import org.threeten.bp.Instant; + +/** + * Class which allows to mock {@link Clock} in unit tests and return custom time values within the + * tests. + */ +class FakeClock extends Clock { + volatile long currentTimeMillis; + + @Override + public Instant instant() { + return Instant.ofEpochMilli(currentTimeMillis); + } +} From 80dd9713b07b51e211fb9d0172a5425a49c6fc66 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Mon, 30 Oct 2023 12:36:08 +0530 Subject: [PATCH 14/25] refactor: move Clock to a new class. --- .../cloud/spanner/AbstractReadContext.java | 1 - .../java/com/google/cloud/spanner/Clock.java | 29 +++++++++++++++++++ .../com/google/cloud/spanner/SessionImpl.java | 1 - .../com/google/cloud/spanner/SessionPool.java | 10 ------- .../cloud/spanner/SessionPoolOptions.java | 1 - .../cloud/spanner/TransactionRunnerImpl.java | 1 - .../com/google/cloud/spanner/FakeClock.java | 1 - .../google/cloud/spanner/SessionPoolTest.java | 1 - 8 files changed, 29 insertions(+), 16 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/Clock.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index ec86664426a..24a5bca8a06 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -36,7 +36,6 @@ import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; -import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Clock.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Clock.java new file mode 100644 index 00000000000..bb3507eeb48 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Clock.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import org.threeten.bp.Instant; + +/** + * Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8 + * Clock. + */ +class Clock { + Instant instant() { + return Instant.now(); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index a207c74ee25..9f788af462e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -29,7 +29,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionId; -import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Ticker; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 45b0e3ac6ab..f3dca32a2ca 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -144,16 +144,6 @@ void maybeWaitOnMinSessions() { } } - /** - * Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8 - * Clock. - */ - static class Clock { - Instant instant() { - return Instant.now(); - } - } - private abstract static class CachedResultSetSupplier implements Supplier { private ResultSet cached; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index e29767abab3..cbea1495368 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -16,7 +16,6 @@ package com.google.cloud.spanner; -import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.SessionPool.Position; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 83bde8baa85..efd30a9ce48 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -30,7 +30,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; -import com.google.cloud.spanner.SessionPool.Clock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FakeClock.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FakeClock.java index 7a8e2cbc152..4bee3cc18e1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FakeClock.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/FakeClock.java @@ -15,7 +15,6 @@ */ package com.google.cloud.spanner; -import com.google.cloud.spanner.SessionPool.Clock; import org.threeten.bp.Instant; /** diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index ddbb5a68f70..65c7c7c03c1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -51,7 +51,6 @@ import com.google.cloud.spanner.MetricRegistryTestUtils.PointWithFunction; import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; import com.google.cloud.spanner.SessionClient.SessionConsumer; -import com.google.cloud.spanner.SessionPool.Clock; import com.google.cloud.spanner.SessionPool.PooledSession; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.Position; From 1acd645aed581e81be686b662f07a7339f7addfe Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Tue, 31 Oct 2023 13:46:28 +0530 Subject: [PATCH 15/25] chore: resolving PR comments. --- .../java/com/google/cloud/spanner/AbstractReadContext.java | 6 +++--- .../main/java/com/google/cloud/spanner/SessionImpl.java | 5 +---- .../main/java/com/google/cloud/spanner/SessionPool.java | 1 - .../google/cloud/spanner/TransactionRunnerImplTest.java | 7 ++++++- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 24a5bca8a06..158116e42c4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -72,7 +73,6 @@ abstract static class Builder, T extends AbstractReadCon private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS; private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS; private ExecutorProvider executorProvider; - private Clock clock; Builder() {} @@ -399,7 +399,7 @@ void initTransaction() { private final int defaultPrefetchChunks; private final QueryOptions defaultQueryOptions; - private final Clock clock; + private Clock clock = new Clock(); @GuardedBy("lock") private boolean isValid = true; @@ -425,7 +425,7 @@ void initTransaction() { this.defaultQueryOptions = builder.defaultQueryOptions; this.span = builder.span; this.executorProvider = builder.executorProvider; - this.clock = builder.clock == null ? new Clock() : builder.clock; + this.clock = firstNonNull(builder.clock, this.clock); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 9f788af462e..20a32c6abf8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -396,10 +396,7 @@ ApiFuture beginTransactionAsync(Options transactionOptions, boolean } TransactionContextImpl newTransaction(Options options) { - final Clock clock = - spanner.getOptions().getSessionPoolOptions() != null - ? spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock() - : null; + final Clock clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock(); return TransactionContextImpl.newBuilder() .setSession(this) .setOptions(options) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index f3dca32a2ca..2a53360c665 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -701,7 +701,6 @@ public ApiFuture readRowAsync(String table, Key key, Iterable co @Override public void buffer(Mutation mutation) { delegate.buffer(mutation); - sessionImpl.markUsed(clock.instant()); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index e13b7b75093..efc57fb480b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -273,10 +273,15 @@ public void batchDmlFailedPrecondition() { @Test public void inlineBegin() { SpannerImpl spanner = mock(SpannerImpl.class); + SpannerOptions options = mock(SpannerOptions.class); + when(spanner.getRpc()).thenReturn(rpc); when(spanner.getDefaultQueryOptions(Mockito.any(DatabaseId.class))) .thenReturn(QueryOptions.getDefaultInstance()); - when(spanner.getOptions()).thenReturn(mock(SpannerOptions.class)); + when(spanner.getOptions()).thenReturn(options); + SessionPoolOptions sessionPoolOptions = SessionPoolOptions.newBuilder().build(); + when(options.getSessionPoolOptions()).thenReturn(sessionPoolOptions); + SessionImpl session = new SessionImpl( spanner, "projects/p/instances/i/databases/d/sessions/s", Collections.EMPTY_MAP) { From 6af8187f09057895f616a6ede353b73f71a4bfaf Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Tue, 31 Oct 2023 14:30:08 +0530 Subject: [PATCH 16/25] chore: address review comments. --- .../com/google/cloud/spanner/DatabaseClientImplTest.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 1e9282dedbc..edfcc4a9126 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -544,7 +544,7 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi InactiveTransactionRemovalOptions.newBuilder() .setIdleTimeThreshold( Duration.ofSeconds( - 3L)) // any session not used for more than 2s will be long-running + 3L)) // any session not used for more than 3s will be long-running .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s .build(); @@ -570,8 +570,6 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; - poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); - TransactionRunner runner = client.readWriteTransaction(); runner.run( transaction -> { @@ -618,7 +616,7 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi InactiveTransactionRemovalOptions.newBuilder() .setIdleTimeThreshold( Duration.ofSeconds( - 3L)) // any session not used for more than 2s will be long-running + 3L)) // any session not used for more than 3s will be long-running .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s .build(); @@ -644,8 +642,6 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; - poolMaintainerClock.currentTimeMillis += Duration.ofMinutes(3).toMillis(); - try (TransactionManager manager = client.transactionManager()) { TransactionContext transaction = manager.begin(); while (true) { From 999a39b317e1740b001b7e89755e160469498883 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Tue, 31 Oct 2023 15:41:19 +0530 Subject: [PATCH 17/25] chore: updating lastUseTime state in TransactionRunnerImpl. Removing redundant updates from SessionPool class. --- .../com/google/cloud/spanner/SessionPool.java | 48 ++++------- .../cloud/spanner/TransactionRunnerImpl.java | 23 ++++-- .../cloud/spanner/DatabaseClientImplTest.java | 81 +++++++++++++++++++ 3 files changed, 112 insertions(+), 40 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 2a53360c665..4a2a7054c34 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -639,39 +639,28 @@ static class SessionPoolTransactionContext implements TransactionContext { @Override public ResultSet read( String table, KeySet keys, Iterable columns, ReadOption... options) { - ResultSet resultSet = - new SessionPoolResultSet(handler, delegate.read(table, keys, columns, options)); - sessionImpl.markUsed(clock.instant()); - return resultSet; + return new SessionPoolResultSet(handler, delegate.read(table, keys, columns, options)); } @Override public AsyncResultSet readAsync( String table, KeySet keys, Iterable columns, ReadOption... options) { - AsyncResultSet resultSet = - new AsyncSessionPoolResultSet(handler, delegate.readAsync(table, keys, columns, options)); - sessionImpl.markUsed(clock.instant()); - return resultSet; + return new AsyncSessionPoolResultSet( + handler, delegate.readAsync(table, keys, columns, options)); } @Override public ResultSet readUsingIndex( String table, String index, KeySet keys, Iterable columns, ReadOption... options) { - ResultSet resultSet = - new SessionPoolResultSet( - handler, delegate.readUsingIndex(table, index, keys, columns, options)); - sessionImpl.markUsed(clock.instant()); - return resultSet; + return new SessionPoolResultSet( + handler, delegate.readUsingIndex(table, index, keys, columns, options)); } @Override public AsyncResultSet readUsingIndexAsync( String table, String index, KeySet keys, Iterable columns, ReadOption... options) { - AsyncResultSet resultSet = - new AsyncSessionPoolResultSet( - handler, delegate.readUsingIndexAsync(table, index, keys, columns, options)); - sessionImpl.markUsed(clock.instant()); - return resultSet; + return new AsyncSessionPoolResultSet( + handler, delegate.readUsingIndexAsync(table, index, keys, columns, options)); } @Override @@ -680,8 +669,6 @@ public Struct readRow(String table, Key key, Iterable columns) { return delegate.readRow(table, key, columns); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); - } finally { - sessionImpl.markUsed(clock.instant()); } } @@ -714,8 +701,6 @@ public Struct readRowUsingIndex(String table, String index, Key key, Iterable executeUpdateAsync(Statement statement, UpdateOption... options) { - ApiFuture apiFuture = - ApiFutures.catching( - delegate.executeUpdateAsync(statement, options), - SessionNotFoundException.class, - input -> { - throw handler.handleSessionNotFound(input); - }, - MoreExecutors.directExecutor()); - sessionImpl.markUsed(clock.instant()); - return apiFuture; + return ApiFutures.catching( + delegate.executeUpdateAsync(statement, options), + SessionNotFoundException.class, + input -> { + throw handler.handleSessionNotFound(input); + }, + MoreExecutors.directExecutor()); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index efd30a9ce48..44759438bf4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerBatchUpdateException; import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -196,6 +197,7 @@ public void removeListener(Runnable listener) { volatile ByteString transactionId; private CommitResponse commitResponse; + private Clock clock = new Clock(); private TransactionContextImpl(Builder builder) { super(builder); @@ -203,6 +205,7 @@ private TransactionContextImpl(Builder builder) { this.trackTransactionStarter = builder.trackTransactionStarter; this.options = builder.options; this.finishedAsyncOperations.set(null); + this.clock = firstNonNull(builder.clock, this.clock); } @Override @@ -396,6 +399,7 @@ public void run() { tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan(); final ApiFuture commitFuture = rpc.commitAsync(commitRequest, session.getOptions()); + session.markUsed(clock.instant()); commitFuture.addListener( tracer.withSpan( opSpan, @@ -470,12 +474,15 @@ ApiFuture rollbackAsync() { // is still in flight. That transaction will then automatically be terminated by the server. if (transactionId != null) { span.addAnnotation("Starting Rollback"); - return rpc.rollbackAsync( - RollbackRequest.newBuilder() - .setSession(session.getName()) - .setTransactionId(transactionId) - .build(), - session.getOptions()); + ApiFuture apiFuture = + rpc.rollbackAsync( + RollbackRequest.newBuilder() + .setSession(session.getName()) + .setTransactionId(transactionId) + .build(), + session.getOptions()); + session.markUsed(clock.instant()); + return apiFuture; } else { return ApiFutures.immediateFuture(Empty.getDefaultInstance()); } @@ -730,6 +737,7 @@ private ResultSet internalExecuteUpdate( try { com.google.spanner.v1.ResultSet resultSet = rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader()); + session.markUsed(clock.instant()); if (resultSet.getMetadata().hasTransaction()) { onTransactionMetadata( resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin()); @@ -760,6 +768,7 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... o // commit. increaseAsyncOperations(); resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader()); + session.markUsed(clock.instant()); } catch (Throwable t) { decreaseAsyncOperations(); throw t; @@ -831,6 +840,7 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option try { com.google.spanner.v1.ExecuteBatchDmlResponse response = rpc.executeBatchDml(builder.build(), session.getOptions()); + session.markUsed(clock.instant()); long[] results = new long[response.getResultSetsCount()]; for (int i = 0; i < response.getResultSetsCount(); ++i) { results[i] = response.getResultSets(i).getStats().getRowCountExact(); @@ -870,6 +880,7 @@ public ApiFuture batchUpdateAsync( // commit. increaseAsyncOperations(); response = rpc.executeBatchDmlAsync(builder.build(), session.getOptions()); + session.markUsed(clock.instant()); } catch (Throwable t) { decreaseAsyncOperations(); throw t; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index edfcc4a9126..ed18e9327c7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -672,6 +672,87 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); } + @Test + public void + testPoolMaintainer_whenLongRunningReadsUsingTransactionManager_retainSessionForTransaction() + throws Exception { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + try (ResultSet resultSet = + transaction.read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.priority(RpcPriority.HIGH))) { + + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + try (ResultSet resultSet = + transaction.read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.priority(RpcPriority.HIGH))) { + + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + @Test public void testWrite() { DatabaseClient client = From ec80d6ad07ba0c4760a4c27c757caab33acb2805 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Wed, 1 Nov 2023 15:28:59 +0530 Subject: [PATCH 18/25] chore: remove redundant update statements from SessionPool class. Add more unit tests. --- .../cloud/spanner/AbstractReadContext.java | 1 + .../com/google/cloud/spanner/SessionPool.java | 36 +- .../cloud/spanner/DatabaseClientImplTest.java | 476 +++++++++++++++++- 3 files changed, 485 insertions(+), 28 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 158116e42c4..c92475202ff 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -699,6 +699,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken SpannerRpc.StreamingCall call = rpc.executeQuery( request.build(), stream.consumer(), session.getOptions(), isRouteToLeader()); + session.markUsed(clock.instant()); call.request(prefetchChunks); stream.setCall(call, request.getTransaction().hasBegin()); return stream; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 4a2a7054c34..4e7102a205e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -741,8 +741,6 @@ public ResultSet analyzeUpdateStatement( return delegate.analyzeUpdateStatement(statement, analyzeMode, options); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); - } finally { - sessionImpl.markUsed(clock.instant()); } } @@ -772,48 +770,34 @@ public long[] batchUpdate(Iterable statements, UpdateOption... option return delegate.batchUpdate(statements, options); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); - } finally { - sessionImpl.markUsed(clock.instant()); } } @Override public ApiFuture batchUpdateAsync( Iterable statements, UpdateOption... options) { - ApiFuture apiFuture = - ApiFutures.catching( - delegate.batchUpdateAsync(statements, options), - SessionNotFoundException.class, - input -> { - throw handler.handleSessionNotFound(input); - }, - MoreExecutors.directExecutor()); - sessionImpl.markUsed(clock.instant()); - return apiFuture; + return ApiFutures.catching( + delegate.batchUpdateAsync(statements, options), + SessionNotFoundException.class, + input -> { + throw handler.handleSessionNotFound(input); + }, + MoreExecutors.directExecutor()); } @Override public ResultSet executeQuery(Statement statement, QueryOption... options) { - ResultSet resultSet = - new SessionPoolResultSet(handler, delegate.executeQuery(statement, options)); - sessionImpl.markUsed(clock.instant()); - return resultSet; + return new SessionPoolResultSet(handler, delegate.executeQuery(statement, options)); } @Override public AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) { - AsyncResultSet resultSet = - new AsyncSessionPoolResultSet(handler, delegate.executeQueryAsync(statement, options)); - sessionImpl.markUsed(clock.instant()); - return resultSet; + return new AsyncSessionPoolResultSet(handler, delegate.executeQueryAsync(statement, options)); } @Override public ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode queryMode) { - ResultSet resultSet = - new SessionPoolResultSet(handler, delegate.analyzeQuery(statement, queryMode)); - sessionImpl.markUsed(clock.instant()); - return resultSet; + return new SessionPoolResultSet(handler, delegate.analyzeQuery(statement, queryMode)); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index ed18e9327c7..4a55c00d869 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -63,6 +63,7 @@ import com.google.cloud.spanner.connection.RandomResultSetGenerator; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.io.BaseEncoding; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.AbstractMessage; @@ -674,8 +675,7 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi @Test public void - testPoolMaintainer_whenLongRunningReadsUsingTransactionManager_retainSessionForTransaction() - throws Exception { + testPoolMaintainer_whenLongRunningReadsUsingTransactionManager_retainSessionForTransaction() { FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = InactiveTransactionRemovalOptions.newBuilder() @@ -753,6 +753,478 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); } + @Test + public void + testPoolMaintainer_whenLongRunningReadRowUsingTransactionManager_retainSessionForTransaction() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + transaction.readRow(READ_TABLE_NAME, Key.of(1L), READ_COLUMN_NAMES); + + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + transaction.readRow(READ_TABLE_NAME, Key.of(1L), READ_COLUMN_NAMES); + + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + @Test + public void + testPoolMaintainer_whenLongRunningAnalyzeUpdateStatementUsingTransactionManager_retainSessionForTransaction() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + try (ResultSet resultSet = + transaction.analyzeUpdateStatement(UPDATE_STATEMENT, QueryAnalyzeMode.PROFILE); ) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + try (ResultSet resultSet = + transaction.analyzeUpdateStatement(UPDATE_STATEMENT, QueryAnalyzeMode.PROFILE); ) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + @Test + public void + testPoolMaintainer_whenLongRunningBatchUpdatesUsingTransactionManager_retainSessionForTransaction() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); + + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); + + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + @Test + public void + testPoolMaintainer_whenLongRunningBatchUpdatesAsyncUsingTransactionManager_retainSessionForTransaction() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + transaction.batchUpdateAsync(Lists.newArrayList(UPDATE_STATEMENT)); + + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + transaction.batchUpdateAsync(Lists.newArrayList(UPDATE_STATEMENT)); + + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + @Test + public void + testPoolMaintainer_whenLongRunningExecuteQueryUsingTransactionManager_retainSessionForTransaction() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + @Test + public void + testPoolMaintainer_whenLongRunningExecuteQueryAsyncUsingTransactionManager_retainSessionForTransaction() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + try (ResultSet resultSet = transaction.executeQueryAsync(SELECT1)) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + try (ResultSet resultSet = transaction.executeQueryAsync(SELECT1)) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + + @Test + public void + testPoolMaintainer_whenLongRunningAnalyzeQueryUsingTransactionManager_retainSessionForTransaction() { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + try (TransactionManager manager = client.transactionManager()) { + TransactionContext transaction = manager.begin(); + while (true) { + try { + try (ResultSet resultSet = transaction.analyzeQuery(SELECT1, QueryAnalyzeMode.PROFILE)) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + try (ResultSet resultSet = transaction.analyzeQuery(SELECT1, QueryAnalyzeMode.PROFILE)) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); + } + } + } + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + assertTrue(client.pool.getNumberOfSessionsInPool() <= client.pool.totalSessions()); + } + @Test public void testWrite() { DatabaseClient client = From 6cdef813fa4dfd020583e92d1b90bd8484913140 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Wed, 1 Nov 2023 15:42:12 +0530 Subject: [PATCH 19/25] chore: add more tests for TransactionRunner. --- .../cloud/spanner/DatabaseClientImplTest.java | 66 ++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 4a55c00d869..aea8a4dcb64 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -538,7 +538,7 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi @Test public void - testPoolMaintainer_whenMultipleReadsExceedingLongRunningThreshold_retainSessionForTransaction() + testPoolMaintainer_whenLongRunningReadsUsingTransactionRunner_retainSessionForTransaction() throws Exception { FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = @@ -610,7 +610,69 @@ public void testPoolMaintainer_whenPDMLFollowedByInactiveTransaction_removeSessi @Test public void - testPoolMaintainer_whenMultipleUpdatesExceedingLongRunningThreshold_retainSessionForTransaction() + testPoolMaintainer_whenLongRunningQueriesUsingTransactionRunner_retainSessionForTransaction() + throws Exception { + FakeClock poolMaintainerClock = new FakeClock(); + InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = + InactiveTransactionRemovalOptions.newBuilder() + .setIdleTimeThreshold( + Duration.ofSeconds( + 3L)) // any session not used for more than 3s will be long-running + .setActionOnInactiveTransaction(ActionOnInactiveTransaction.CLOSE) + .setExecutionFrequency(Duration.ofSeconds(1)) // execute thread every 1s + .build(); + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMinSessions(1) + .setMaxSessions(1) // to ensure there is 1 session and pool is 100% utilized + .setInactiveTransactionRemovalOptions(inactiveTransactionRemovalOptions) + .setLoopFrequency(1000L) // main thread runs every 1s + .setPoolMaintainerClock(poolMaintainerClock) + .build(); + spanner = + SpannerOptions.newBuilder() + .setProjectId(TEST_PROJECT) + .setDatabaseRole(TEST_DATABASE_ROLE) + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(sessionPoolOptions) + .build() + .getService(); + DatabaseClientImpl client = + (DatabaseClientImpl) + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Instant initialExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + TransactionRunner runner = client.readWriteTransaction(); + runner.run( + transaction -> { + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(1050).toMillis(); + + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + poolMaintainerClock.currentTimeMillis += Duration.ofMillis(2050).toMillis(); + + // force trigger pool maintainer to check for long-running sessions + client.pool.poolMaintainer.maintainPool(); + + return null; + }); + + Instant endExecutionTime = client.pool.poolMaintainer.lastExecutionTime; + + assertNotEquals( + endExecutionTime, + initialExecutionTime); // if session clean up task runs then these timings won't match + assertEquals(0, client.pool.numLeakedSessionsRemoved()); + } + + @Test + public void + testPoolMaintainer_whenLongRunningUpdatesUsingTransactionManager_retainSessionForTransaction() throws Exception { FakeClock poolMaintainerClock = new FakeClock(); InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions = From 593a10bd937636cc04eaa848ecaf3b741eb740dc Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Wed, 1 Nov 2023 16:02:33 +0530 Subject: [PATCH 20/25] chore: remove dead code from constructor of SessionPoolTransactionContext. --- .../com/google/cloud/spanner/SessionPool.java | 25 +++++-------------- .../SessionPoolAsyncTransactionManager.java | 10 ++------ 2 files changed, 8 insertions(+), 27 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 4e7102a205e..54a0a292cd8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -622,18 +622,10 @@ public CursorState tryNext() { static class SessionPoolTransactionContext implements TransactionContext { private final SessionNotFoundHandler handler; final TransactionContext delegate; - private final SessionImpl sessionImpl; - private final Clock clock; - - SessionPoolTransactionContext( - SessionNotFoundHandler handler, - TransactionContext delegate, - SessionImpl sessionImpl, - Clock clock) { + + SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate) { this.handler = Preconditions.checkNotNull(handler); this.delegate = delegate; - this.sessionImpl = sessionImpl; - this.clock = clock; } @Override @@ -831,9 +823,7 @@ public TransactionContext begin() { } private TransactionContext internalBegin() { - TransactionContext res = - new SessionPoolTransactionContext( - this, delegate.begin(), session.get().delegate, sessionPool.clock); + TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin()); session.get().markUsed(); return res; } @@ -883,14 +873,11 @@ public TransactionContext resetForRetry() { while (true) { try { if (restartedAfterSessionNotFound) { - TransactionContext res = - new SessionPoolTransactionContext( - this, delegate.begin(), session.get().delegate, sessionPool.clock); + TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin()); restartedAfterSessionNotFound = false; return res; } else { - return new SessionPoolTransactionContext( - this, delegate.resetForRetry(), session.get().delegate, sessionPool.clock); + return new SessionPoolTransactionContext(this, delegate.resetForRetry()); } } catch (SessionNotFoundException e) { session = sessionPool.replaceSession(e, session); @@ -2012,7 +1999,7 @@ enum Position { private final ExecutorFactory executorFactory; final PoolMaintainer poolMaintainer; - final Clock clock; + private final Clock clock; /** * initialReleasePosition determines where in the pool sessions are added when they are released * into the pool the first time. This is always RANDOM in production, but some tests use FIRST to diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index f64b2348614..b6442fd2182 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -149,10 +149,7 @@ public void onFailure(Throwable t) { public void onSuccess(TransactionContext result) { delegateTxnFuture.set( new SessionPool.SessionPoolTransactionContext( - SessionPoolAsyncTransactionManager.this, - result, - session.get().delegate, - pool.clock)); + SessionPoolAsyncTransactionManager.this, result)); } }, MoreExecutors.directExecutor()); @@ -256,10 +253,7 @@ public TransactionContextFuture resetForRetryAsync() { MoreExecutors.directExecutor()), input -> new SessionPool.SessionPoolTransactionContext( - SessionPoolAsyncTransactionManager.this, - input, - session.get().delegate, - pool.clock), + SessionPoolAsyncTransactionManager.this, input), MoreExecutors.directExecutor())); } From ced1e061d5e755103ac844615c5479cb734a2649 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 1 Nov 2023 13:04:53 +0000 Subject: [PATCH 21/25] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 169541eff05..7feed7d72cd 100644 --- a/README.md +++ b/README.md @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.24.0') +implementation platform('com.google.cloud:libraries-bom:26.26.0') implementation 'com.google.cloud:google-cloud-spanner' ``` If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-spanner:6.49.0' +implementation 'com.google.cloud:google-cloud-spanner:6.52.1' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.49.0" +libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.52.1" ``` @@ -432,7 +432,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.49.0 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.52.1 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles From 0485aee9916cc7c5d507d9c9f7afc0fff312676f Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Thu, 2 Nov 2023 17:58:55 +0530 Subject: [PATCH 22/25] Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Knut Olav Løite --- .../java/com/google/cloud/spanner/TransactionRunnerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 44759438bf4..9f24568accf 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -95,7 +95,7 @@ static class Builder extends AbstractReadContext.Builder Date: Thu, 2 Nov 2023 17:59:09 +0530 Subject: [PATCH 23/25] Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Knut Olav Løite --- .../java/com/google/cloud/spanner/TransactionRunnerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 9f24568accf..aab8c46e74a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -87,7 +87,7 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { static class Builder extends AbstractReadContext.Builder { - private Clock clock; + private Clock clock = new Clock(); private ByteString transactionId; private Options options; private boolean trackTransactionStarter; From b75b19f28d4c97b8431d4ed46debd6ff62df87dc Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Thu, 2 Nov 2023 17:59:19 +0530 Subject: [PATCH 24/25] Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Knut Olav Løite --- .../java/com/google/cloud/spanner/TransactionRunnerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index aab8c46e74a..f675272ba06 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -197,7 +197,7 @@ public void removeListener(Runnable listener) { volatile ByteString transactionId; private CommitResponse commitResponse; - private Clock clock = new Clock(); + private final Clock clock; private TransactionContextImpl(Builder builder) { super(builder); From 86327e2be88e2d79194c0fc86b5e9ac5c8f35447 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Thu, 2 Nov 2023 18:29:09 +0530 Subject: [PATCH 25/25] chore: fixing precondition errors due to null clock. --- .../com/google/cloud/spanner/AbstractReadContext.java | 10 +++++----- .../java/com/google/cloud/spanner/SessionImpl.java | 6 ++++-- .../google/cloud/spanner/TransactionRunnerImpl.java | 3 +-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index c92475202ff..a0b25cb64c0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -17,7 +17,6 @@ package com.google.cloud.spanner; import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; -import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -39,6 +38,7 @@ import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; @@ -73,7 +73,7 @@ abstract static class Builder, T extends AbstractReadCon private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS; private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS; private ExecutorProvider executorProvider; - private Clock clock; + private Clock clock = new Clock(); Builder() {} @@ -113,7 +113,7 @@ B setExecutorProvider(ExecutorProvider executorProvider) { } B setClock(Clock clock) { - this.clock = clock; + this.clock = Preconditions.checkNotNull(clock); return self(); } @@ -399,7 +399,7 @@ void initTransaction() { private final int defaultPrefetchChunks; private final QueryOptions defaultQueryOptions; - private Clock clock = new Clock(); + private final Clock clock; @GuardedBy("lock") private boolean isValid = true; @@ -425,7 +425,7 @@ void initTransaction() { this.defaultQueryOptions = builder.defaultQueryOptions; this.span = builder.span; this.executorProvider = builder.executorProvider; - this.clock = firstNonNull(builder.clock, this.clock); + this.clock = builder.clock; } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 20a32c6abf8..0e763dbc93d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -396,7 +396,9 @@ ApiFuture beginTransactionAsync(Options transactionOptions, boolean } TransactionContextImpl newTransaction(Options options) { - final Clock clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock(); + // A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests. + final Clock poolMaintainerClock = + spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock(); return TransactionContextImpl.newBuilder() .setSession(this) .setOptions(options) @@ -408,7 +410,7 @@ TransactionContextImpl newTransaction(Options options) { .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) .setSpan(currentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) - .setClock(clock) + .setClock(poolMaintainerClock == null ? new Clock() : poolMaintainerClock) .build(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index f675272ba06..21c74a400f0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -18,7 +18,6 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerBatchUpdateException; import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; -import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -205,7 +204,7 @@ private TransactionContextImpl(Builder builder) { this.trackTransactionStarter = builder.trackTransactionStarter; this.options = builder.options; this.finishedAsyncOperations.set(null); - this.clock = firstNonNull(builder.clock, this.clock); + this.clock = builder.clock; } @Override