Skip to content

Commit

Permalink
chore: add new members in SessionImpl for multiplexed session. Add a … (
Browse files Browse the repository at this point in the history
googleapis#2961)

* chore: add new members in SessionImpl for multiplexed session. Add a new method to create multiplexed session.

* chore: add unit tests.

* Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Co-authored-by: Knut Olav Løite <[email protected]>

* fix: comments.

* chore: prefer junit assertions.

* chore: change to default method in SpannerRpc interface.

---------

Co-authored-by: Knut Olav Løite <[email protected]>
  • Loading branch information
2 people authored and tlhquynh committed Mar 25, 2024
1 parent 8e4ee03 commit ebf8723
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ SessionImpl createSession() {
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
options);
return new SessionImpl(spanner, session.getName(), options);
return new SessionImpl(
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), options);
} catch (RuntimeException e) {
span.setStatus(e);
throw e;
Expand All @@ -224,6 +225,39 @@ SessionImpl createSession() {
}
}

/**
* Create a multiplexed session and returns it to the given {@link SessionConsumer}. A multiplexed
* session is not affiliated with any GRPC channel. The given {@link SessionConsumer} is
* guaranteed to eventually get exactly 1 multiplexed session unless an error occurs. In case of
* an error on the gRPC calls, the consumer will receive one {@link
* SessionConsumer#onSessionCreateFailure(Throwable, int)} calls with the error.
*
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
*/
void createMultiplexedSession(SessionConsumer consumer) {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
null,
true);
SessionImpl sessionImpl =
new SessionImpl(
spanner, session.getName(), session.getCreateTime(), session.getMultiplexed(), null);
consumer.onSessionReady(sessionImpl);
} catch (Throwable t) {
span.setStatus(t);
consumer.onSessionCreateFailure(t, 1);
} finally {
span.end();
}
}

/**
* Asynchronously creates a batch of sessions and returns these to the given {@link
* SessionConsumer}. This method may split the actual session creation over several gRPC calls in
Expand Down Expand Up @@ -311,7 +345,13 @@ private List<SessionImpl> internalBatchCreateSessions(
span.end();
List<SessionImpl> res = new ArrayList<>(sessionCount);
for (com.google.spanner.v1.Session session : sessions) {
res.add(new SessionImpl(spanner, session.getName(), options));
res.add(
new SessionImpl(
spanner,
session.getName(),
session.getCreateTime(),
session.getMultiplexed(),
options));
}
return res;
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ interface SessionTransaction {
ByteString readyTransactionId;
private final Map<SpannerRpc.Option, ?> options;
private volatile Instant lastUseTime;
@Nullable private final Instant createTime;
private final boolean isMultiplexed;
private ISpan currentSpan;

SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
Expand All @@ -107,6 +109,24 @@ interface SessionTransaction {
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
this.lastUseTime = Instant.now();
this.createTime = null;
this.isMultiplexed = false;
}

SessionImpl(
SpannerImpl spanner,
String name,
com.google.protobuf.Timestamp createTime,
boolean isMultiplexed,
Map<SpannerRpc.Option, ?> options) {
this.spanner = spanner;
this.tracer = spanner.getTracer();
this.options = options;
this.name = checkNotNull(name);
this.databaseId = SessionId.of(name).getDatabaseId();
this.lastUseTime = Instant.now();
this.createTime = convert(createTime);
this.isMultiplexed = isMultiplexed;
}

@Override
Expand All @@ -130,6 +150,14 @@ Instant getLastUseTime() {
return lastUseTime;
}

Instant getCreateTime() {
return createTime;
}

boolean getIsMultiplexed() {
return isMultiplexed;
}

void markUsed(Instant instant) {
lastUseTime = instant;
}
Expand Down Expand Up @@ -455,4 +483,11 @@ boolean hasReadyTransaction() {
TraceWrapper getTracer() {
return tracer;
}

private Instant convert(com.google.protobuf.Timestamp timestamp) {
if (timestamp == null) {
return null;
}
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SpannerImpl extends BaseService<SpannerOptions> implements Spanner {
MetricRegistryConstants.INSTRUMENTATION_SCOPE,
GaxProperties.getLibraryVersion(this.getOptions().getClass())));

static final String CREATE_MULTIPLEXED_SESSION = "CloudSpannerOperation.CreateMultiplexedSession";
static final String CREATE_SESSION = "CloudSpannerOperation.CreateSession";
static final String BATCH_CREATE_SESSIONS = "CloudSpannerOperation.BatchCreateSessions";
static final String BATCH_CREATE_SESSIONS_REQUEST =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,18 @@ public Session createSession(
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options)
throws SpannerException {
// By default sessions are not multiplexed
return createSession(databaseName, databaseRole, labels, options, false);
}

@Override
public Session createSession(
String databaseName,
@Nullable String databaseRole,
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options,
boolean isMultiplexed)
throws SpannerException {
CreateSessionRequest.Builder requestBuilder =
CreateSessionRequest.newBuilder().setDatabase(databaseName);
Session.Builder sessionBuilder = Session.newBuilder();
Expand All @@ -1614,6 +1626,7 @@ public Session createSession(
if (databaseRole != null && !databaseRole.isEmpty()) {
sessionBuilder.setCreatorRole(databaseRole);
}
sessionBuilder.setMultiplexed(isMultiplexed);
requestBuilder.setSession(sessionBuilder);
CreateSessionRequest request = requestBuilder.build();
GrpcCallContext context =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,16 @@ Session createSession(
@Nullable Map<Option, ?> options)
throws SpannerException;

default Session createSession(
String databaseName,
@Nullable String databaseRole,
@Nullable Map<String, String> labels,
@Nullable Map<Option, ?> options,
boolean isMultiplexed)
throws SpannerException {
throw new UnsupportedOperationException("Unimplemented");
}

void deleteSession(String sessionName, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,9 @@ public void createSession(
CreateSessionRequest request, StreamObserver<Session> responseObserver) {
requests.add(request);
Preconditions.checkNotNull(request.getDatabase());
Preconditions.checkNotNull(request.getSession());
String name = generateSessionName(request.getDatabase());
Session requestSession = request.getSession();
try {
createSessionExecutionTime.simulateExecutionTime(
exceptions, stickyGlobalExceptions, freezeLock);
Expand All @@ -868,6 +870,7 @@ public void createSession(
.setCreateTime(now)
.setName(name)
.setApproximateLastUseTime(now)
.setMultiplexed(requestSession.getMultiplexed())
.build();
Session prev = sessions.putIfAbsent(name, session);
if (prev == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
Expand All @@ -29,6 +32,7 @@
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.common.collect.ImmutableMap;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.OpenTelemetry;
import java.util.ArrayList;
Expand Down Expand Up @@ -151,6 +155,81 @@ public void createAndCloseSession() {
}
}

@Test
public void createAndCloseMultiplexedSession() {
DatabaseId db = DatabaseId.of(dbName);
String sessionName = dbName + "/sessions/s1";
Map<String, String> labels = ImmutableMap.of("env", "dev");
String databaseRole = "role";
when(spannerOptions.getSessionLabels()).thenReturn(labels);
when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole);
com.google.spanner.v1.Session sessionProto =
com.google.spanner.v1.Session.newBuilder()
.setName(sessionName)
.setMultiplexed(true)
.putAllLabels(labels)
.build();
when(rpc.createSession(
Mockito.eq(dbName),
Mockito.eq(databaseRole),
Mockito.eq(labels),
options.capture(),
Mockito.eq(true)))
.thenReturn(sessionProto);
final AtomicInteger returnedSessionCount = new AtomicInteger();
final SessionConsumer consumer =
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {
assertEquals(sessionName, session.getName());
returnedSessionCount.incrementAndGet();

session.close();
Mockito.verify(rpc).deleteSession(sessionName, options.getValue());
}

@Override
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {}
};
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.createMultiplexedSession(consumer);
}
// for multiplexed session there is no channel hint pass in the RPC options
assertNull(options.getValue());
assertEquals(1, returnedSessionCount.get());
}

@Test
public void createAndCloseMultiplexedSession_whenRPCThrowsException_thenAssertException() {
DatabaseId db = DatabaseId.of(dbName);
Map<String, String> labels = ImmutableMap.of("env", "dev");
String databaseRole = "role";
when(spannerOptions.getSessionLabels()).thenReturn(labels);
when(spannerOptions.getDatabaseRole()).thenReturn(databaseRole);
when(rpc.createSession(
Mockito.eq(dbName),
Mockito.eq(databaseRole),
Mockito.eq(labels),
options.capture(),
Mockito.eq(true)))
.thenThrow(RuntimeException.class);
final SessionConsumer consumer =
new SessionConsumer() {
@Override
public void onSessionReady(SessionImpl session) {}

@Override
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
assertTrue(t instanceof RuntimeException);
}
};
try (SessionClient client = new SessionClient(spanner, db, new TestExecutorFactory())) {
client.createMultiplexedSession(consumer);
}
// for multiplexed session there is no channel hint pass in the RPC options
assertNull(options.getValue());
}

@SuppressWarnings("unchecked")
@Test
public void batchCreateAndCloseSessions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.GetSessionRequest;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.StructType.Field;
Expand Down Expand Up @@ -668,6 +669,42 @@ public void testAdminStubSettings_whenStubNotInitialized_assertNullClientSetting
rpc.shutdown();
}

@Test
public void testCreateSession_assertSessionProto() {
SpannerOptions options = createSpannerOptions();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);

Session session = rpc.createSession("DATABASE_NAME", null, null, null);
assertNotNull(session);
assertNotNull(session.getCreateTime());
assertEquals(false, session.getMultiplexed());
rpc.shutdown();
}

@Test
public void testCreateSession_whenMultiplexedSessionIsTrue_assertSessionProto() {
SpannerOptions options = createSpannerOptions();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);

Session session = rpc.createSession("DATABASE_NAME", null, null, null, true);
assertNotNull(session);
assertNotNull(session.getCreateTime());
assertEquals(true, session.getMultiplexed());
rpc.shutdown();
}

@Test
public void testCreateSession_whenMultiplexedSessionIsFalse_assertSessionProto() {
SpannerOptions options = createSpannerOptions();
GapicSpannerRpc rpc = new GapicSpannerRpc(options, true);

Session session = rpc.createSession("DATABASE_NAME", null, null, null, false);
assertNotNull(session);
assertNotNull(session.getCreateTime());
assertEquals(false, session.getMultiplexed());
rpc.shutdown();
}

private SpannerOptions createSpannerOptions() {
String endpoint = address.getHostString() + ":" + server.getPort();
return SpannerOptions.newBuilder()
Expand Down

0 comments on commit ebf8723

Please sign in to comment.