Skip to content

Commit

Permalink
Spanner: Create new instance if existing Spanner is closed (#5200)
Browse files Browse the repository at this point in the history
* do not hand out a closed Spanner instance

SpannerOptions caches any Spanner instance that has been created, and
hands this cached instance out to all subsequent calls to
SpannerOptions.getService(). This also included closed Spanner
instances. The getService() method now returns an error if the Spanner
instance has already been closed.

* fix small merge error

* create a new instance if the service/rpc is closed

SpannerOptions.getService() and SpannerOptions.getRpc() should return a
new instance instead of throwing an exception if the service/rpc object
has been closed.

* add test case to ensure correct caching behavior

* use shouldRefreshService instead of createNewService

* fix merge conflicts

* added documentation to shouldRefresh... methods

* removed overrides only for comments

* fixed naming

* added assertions for isClosed()

* fixed formatting
  • Loading branch information
olavloite authored and kolea2 committed Jul 1, 2019
1 parent 9659047 commit 17cb858
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -494,24 +494,40 @@ static String getServiceAccountProjectId(String credentialsPath) {
*/
@SuppressWarnings("unchecked")
public ServiceT getService() {
if (service == null) {
if (shouldRefreshService(service)) {
service = serviceFactory.create((OptionsT) this);
}
return service;
}

/**
* @param cachedService The currently cached service object
* @return true if the currently cached service object should be refreshed.
*/
protected boolean shouldRefreshService(ServiceT cachedService) {
return cachedService == null;
}

/**
* Returns a Service RPC object for the current service. For instance, when using Google Cloud
* Storage, it returns a StorageRpc object.
*/
@SuppressWarnings("unchecked")
public ServiceRpc getRpc() {
if (rpc == null) {
if (shouldRefreshRpc(rpc)) {
rpc = serviceRpcFactory.create((OptionsT) this);
}
return rpc;
}

/**
* @param cachedRpc The currently cached service object
* @return true if the currently cached service object should be refreshed.
*/
protected boolean shouldRefreshRpc(ServiceRpc cachedRpc) {
return cachedRpc == null;
}

/**
* Returns the project ID. Return value can be null (for services that don't require a project
* ID).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,7 @@ public interface Spanner extends Service<SpannerOptions>, AutoCloseable {
*/
@Override
void close();

/** @return <code>true</code> if this {@link Spanner} object is closed. */
boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ public void close() {
}
}

@Override
public boolean isClosed() {
return spannerIsClosed;
}

/**
* Encapsulates state to be passed to the {@link SpannerRpc} layer for a given session. Currently
* used to select the {@link io.grpc.Channel} to be used in issuing the RPCs in a Session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,26 @@ protected SpannerRpc getSpannerRpcV1() {
return (SpannerRpc) getRpc();
}

/**
* @return <code>true</code> if the cached Spanner service instance is <code>null</code> or
* closed. This will cause the method {@link #getService()} to create a new {@link SpannerRpc}
* instance when one is requested.
*/
@Override
protected boolean shouldRefreshService(Spanner cachedService) {
return cachedService == null || cachedService.isClosed();
}

/**
* @return <code>true</code> if the cached {@link ServiceRpc} instance is <code>null</code> or
* closed. This will cause the method {@link #getRpc()} to create a new {@link Spanner}
* instance when one is requested.
*/
@Override
protected boolean shouldRefreshRpc(ServiceRpc cachedRpc) {
return cachedRpc == null || ((SpannerRpc) cachedRpc).isClosed();
}

@SuppressWarnings("unchecked")
@Override
public Builder toBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ private synchronized void shutdown() {
private static final int DEFAULT_PERIOD_SECONDS = 10;

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final InstanceAdminStub instanceAdminStub;
private final DatabaseAdminStub databaseAdminStub;
Expand Down Expand Up @@ -600,13 +601,19 @@ private GrpcCallContext newCallContext(@Nullable Map<Option, ?> options, String

@Override
public void shutdown() {
this.rpcIsClosed = true;
this.spannerStub.close();
this.instanceAdminStub.close();
this.databaseAdminStub.close();
this.spannerWatchdog.shutdown();
this.executorProvider.shutdown();
}

@Override
public boolean isClosed() {
return rpcIsClosed;
}

/**
* A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to
* the {@link ResultStreamConsumer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,6 @@ PartitionResponse partitionRead(PartitionReadRequest request, @Nullable Map<Opti
throws SpannerException;

public void shutdown();

boolean isClosed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;

import com.google.api.core.NanoClock;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.NoCredentials;
import com.google.cloud.ServiceRpc;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import java.util.HashMap;
Expand Down Expand Up @@ -122,4 +126,42 @@ public void getDbclientAfterCloseThrows() {
assertThat(e.getMessage()).contains("Cloud Spanner client has been closed");
}
}

@Test
public void testSpannerClosed() throws InterruptedException {
SpannerOptions options = createSpannerOptions();
Spanner spanner1 = options.getService();
Spanner spanner2 = options.getService();
ServiceRpc rpc1 = options.getRpc();
ServiceRpc rpc2 = options.getRpc();
// The SpannerOptions object should return the same instance.
assertThat(spanner1 == spanner2, is(true));
assertThat(rpc1 == rpc2, is(true));
spanner1.close();
// A new instance should be returned as the Spanner instance has been closed.
Spanner spanner3 = options.getService();
assertThat(spanner1 == spanner3, is(false));
// A new instance should be returned as the Spanner instance has been closed.
ServiceRpc rpc3 = options.getRpc();
assertThat(rpc1 == rpc3, is(false));
// Creating a copy of the SpannerOptions should result in new instances.
options = options.toBuilder().build();
Spanner spanner4 = options.getService();
ServiceRpc rpc4 = options.getRpc();
assertThat(spanner4 == spanner3, is(false));
assertThat(rpc4 == rpc3, is(false));
Spanner spanner5 = options.getService();
ServiceRpc rpc5 = options.getRpc();
assertThat(spanner4 == spanner5, is(true));
assertThat(rpc4 == rpc5, is(true));
spanner3.close();
spanner4.close();
}

private SpannerOptions createSpannerOptions() {
return SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setCredentials(NoCredentials.getInstance())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.NoCredentials;
import com.google.cloud.TransportOptions;
import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings;
import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings;
Expand Down Expand Up @@ -353,4 +356,30 @@ public void testNullSessionLabels() {
thrown.expect(NullPointerException.class);
SpannerOptions.newBuilder().setSessionLabels(null);
}

@Test
public void testDoNotCacheClosedSpannerInstance() {
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.setCredentials(NoCredentials.getInstance())
.build();
// Getting a service twice should give the same instance.
Spanner service1 = options.getService();
Spanner service2 = options.getService();
assertThat(service1 == service2, is(true));
assertThat(service1.isClosed()).isFalse();
// Closing a service instance should cause the SpannerOptions to create a new service.
service1.close();
Spanner service3 = options.getService();
assertThat(service3 == service1, is(false));
assertThat(service1.isClosed()).isTrue();
assertThat(service3.isClosed()).isFalse();
;
// Getting another service from the SpannerOptions should return the new cached instance.
Spanner service4 = options.getService();
assertThat(service3 == service4, is(true));
assertThat(service3.isClosed()).isFalse();
service3.close();
}
}

0 comments on commit 17cb858

Please sign in to comment.