Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: turn off autosavepoints by default #3303

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1921,6 +1921,7 @@ UnitOfWork createNewUnitOfWork(
.build();
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
.setUsesEmulator(options.usesEmulator())
.setUseAutoSavepointsForEmulator(options.useAutoSavepointsForEmulator())
.setDatabaseClient(dbClient)
.setDelayTransactionStartUntilFirstWrite(delayTransactionStartUntilFirstWrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Suppliers;
import com.google.common.collect.Sets;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import io.opentelemetry.api.OpenTelemetry;
Expand Down Expand Up @@ -382,6 +383,10 @@ private static String generateGuardedConnectionPropertyError(
+ "The instance and database in the connection string will automatically be created if these do not yet exist on the emulator. "
+ "Add dialect=postgresql to the connection string to make sure that the database that is created uses the PostgreSQL dialect.",
false),
ConnectionProperty.createBooleanProperty(
"useAutoSavepointsForEmulator",
"Automatically creates savepoints for each statement in a read/write transaction when using the Emulator. This is no longer needed when using Emulator version 1.5.23 or higher.",
false),
ConnectionProperty.createBooleanProperty(
LENIENT_PROPERTY_NAME,
"Silently ignore unknown properties in the connection string/properties (true/false)",
Expand Down Expand Up @@ -740,6 +745,7 @@ public static Builder newBuilder() {
private final boolean returnCommitStats;
private final Long maxCommitDelay;
private final boolean autoConfigEmulator;
private final boolean useAutoSavepointsForEmulator;
private final Dialect dialect;
private final RpcPriority rpcPriority;
private final DdlInTransactionMode ddlInTransactionMode;
Expand Down Expand Up @@ -801,6 +807,7 @@ private ConnectionOptions(Builder builder) {
this.returnCommitStats = parseReturnCommitStats(this.uri);
this.maxCommitDelay = parseMaxCommitDelay(this.uri);
this.autoConfigEmulator = parseAutoConfigEmulator(this.uri);
this.useAutoSavepointsForEmulator = parseUseAutoSavepointsForEmulator(this.uri);
this.dialect = parseDialect(this.uri);
this.usePlainText = this.autoConfigEmulator || parseUsePlainText(this.uri);
this.host =
Expand Down Expand Up @@ -1170,6 +1177,11 @@ static boolean parseAutoConfigEmulator(String uri) {
return Boolean.parseBoolean(value);
}

static boolean parseUseAutoSavepointsForEmulator(String uri) {
String value = parseUriProperty(uri, "useAutoSavepointsForEmulator");
return Boolean.parseBoolean(value);
}

@VisibleForTesting
static Dialect parseDialect(String uri) {
String value = parseUriProperty(uri, DIALECT_PROPERTY_NAME);
Expand Down Expand Up @@ -1535,6 +1547,14 @@ public Duration getMaxCommitDelay() {
return maxCommitDelay == null ? null : Duration.ofMillis(maxCommitDelay);
}

boolean usesEmulator() {
return Suppliers.memoize(
() ->
this.autoConfigEmulator
|| !Strings.isNullOrEmpty(System.getenv("SPANNER_EMULATOR_HOST")))
.get();
}

/**
* Whether connections created by this {@link ConnectionOptions} will automatically try to connect
* to the emulator using the default host/port of the emulator, and automatically create the
Expand All @@ -1548,11 +1568,11 @@ public boolean isAutoConfigEmulator() {
/**
* Returns true if a connection should generate auto-savepoints for retrying transactions on the
* emulator. This allows some more concurrent transactions on the emulator.
*
* <p>This is no longer needed since version 1.5.23 of the emulator.
*/
boolean useAutoSavepointsForEmulator() {
// For now, this option is directly linked to the option autoConfigEmulator=true, which is the
// recommended way to configure the emulator for the Connection API.
return autoConfigEmulator;
return useAutoSavepointsForEmulator;
}

public Dialect getDialect() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction {
*/
private static final String AUTO_SAVEPOINT_NAME = "_auto_savepoint";

private final boolean usesEmulator;

/**
* Indicates whether an automatic savepoint should be generated after each statement, so the
* transaction can be manually aborted and retried by the Connection API when connected to the
Expand Down Expand Up @@ -191,6 +193,7 @@ public void onSuccess(V result) {
}

static class Builder extends AbstractMultiUseTransaction.Builder<Builder, ReadWriteTransaction> {
private boolean usesEmulator;
private boolean useAutoSavepointsForEmulator;
private DatabaseClient dbClient;
private Boolean retryAbortsInternally;
Expand All @@ -203,6 +206,11 @@ static class Builder extends AbstractMultiUseTransaction.Builder<Builder, ReadWr

private Builder() {}

Builder setUsesEmulator(boolean usesEmulator) {
this.usesEmulator = usesEmulator;
return this;
}

Builder setUseAutoSavepointsForEmulator(boolean useAutoSavepoints) {
this.useAutoSavepointsForEmulator = useAutoSavepoints;
return this;
Expand Down Expand Up @@ -269,13 +277,13 @@ static Builder newBuilder() {
private ReadWriteTransaction(Builder builder) {
super(builder);
this.transactionId = ID_GENERATOR.incrementAndGet();
this.useAutoSavepointsForEmulator =
builder.useAutoSavepointsForEmulator && builder.retryAbortsInternally;
this.usesEmulator = builder.usesEmulator;
this.useAutoSavepointsForEmulator = builder.useAutoSavepointsForEmulator;
// Use a higher max for internal retries if auto-savepoints have been enabled for the emulator.
// This can cause a larger number of transactions to be aborted and retried, and retrying on the
// emulator is fast, so increasing the limit is reasonable.
this.maxInternalRetries =
this.useAutoSavepointsForEmulator
builder.usesEmulator && builder.retryAbortsInternally
? DEFAULT_MAX_INTERNAL_RETRIES * 10
: DEFAULT_MAX_INTERNAL_RETRIES;
this.dbClient = builder.dbClient;
Expand Down Expand Up @@ -1076,7 +1084,7 @@ private void handleAborted(AbortedException aborted) {
Thread.sleep(delay);
} else if (aborted.isEmulatorOnlySupportsOneTransactionException()) {
//noinspection BusyWait
Thread.sleep(ThreadLocalRandom.current().nextInt(50));
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 5));
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@
import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.ParallelIntegrationTest;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.connection.Connection;
import com.google.cloud.spanner.connection.ITAbstractSpannerTest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -41,14 +44,24 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@Category(ParallelIntegrationTest.class)
@RunWith(JUnit4.class)
@RunWith(Parameterized.class)
public class ITEmulatorConcurrentTransactionsTest extends ITAbstractSpannerTest {
@Parameters(name = "Use auto-savepoints={0}")
public static Object[] parameters() {
return new Object[] {Boolean.TRUE, Boolean.FALSE};
}

@Parameter public boolean useAutoSavepointsForEmulator;

@Override
public void appendConnectionUri(StringBuilder uri) {
uri.append(";autoConfigEmulator=true;autoCommit=false");
uri.append(";autoConfigEmulator=true;autoCommit=false;useAutoSavepointsForEmulator=")
.append(useAutoSavepointsForEmulator);
}

@Override
Expand Down Expand Up @@ -118,15 +131,21 @@ public void testSingleThreadRandomTransactions() {
}

@Test
public void testMultiThreadedRandomTransactions() throws InterruptedException {
public void testMultiThreadedRandomTransactions() throws Exception {
int numThreads = ThreadLocalRandom.current().nextInt(10) + 5;
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
AtomicInteger numRowsInserted = new AtomicInteger();
List<Future<?>> futures = new ArrayList<>(numThreads);
for (int thread = 0; thread < numThreads; thread++) {
executor.submit(() -> runRandomTransactions(numRowsInserted));
futures.add(executor.submit(() -> runRandomTransactions(numRowsInserted)));
}
executor.shutdown();
assertTrue(executor.awaitTermination(60L, TimeUnit.SECONDS));
// Get the results of each transaction so the test case fails with a logical error message if
// any of the transactions failed.
for (Future<?> future : futures) {
assertNull(future.get());
}
verifyRowCount(numRowsInserted.get());
}

Expand All @@ -141,7 +160,7 @@ private void runRandomTransactions(AtomicInteger numRowsInserted) {
while (!connections.isEmpty()) {
int index = ThreadLocalRandom.current().nextInt(connections.size());
Connection connection = connections.get(index);
if (ThreadLocalRandom.current().nextInt(10) < 3) {
if (ThreadLocalRandom.current().nextInt(10) < 5) {
connection.commit();
connection.close();
assertEquals(connection, connections.remove(index));
Expand All @@ -155,6 +174,12 @@ private void runRandomTransactions(AtomicInteger numRowsInserted) {
.build()));
numRowsInserted.incrementAndGet();
}
try {
// Make sure to have a small wait between statements.
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 5));
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}
} finally {
for (Connection connection : connections) {
Expand Down
Loading