diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index cbf9b8a839..119b410c7a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -66,6 +66,9 @@ public class ConnectionWorkerPool { */ private final java.time.Duration maxRetryDuration; + /* + * Retry settings for in-stream retries. + */ private RetrySettings retrySettings; /* @@ -208,7 +211,8 @@ public abstract static class Builder { FlowController.LimitExceededBehavior limitExceededBehavior, String traceId, @Nullable String comperssorName, - BigQueryWriteSettings clientSettings) { + BigQueryWriteSettings clientSettings, + RetrySettings retrySettings) { this.maxInflightRequests = maxInflightRequests; this.maxInflightBytes = maxInflightBytes; this.maxRetryDuration = maxRetryDuration; @@ -217,8 +221,7 @@ public abstract static class Builder { this.compressorName = comperssorName; this.clientSettings = clientSettings; this.currentMaxConnectionCount = settings.minConnectionsPerRegion(); - // In-stream retry is not enabled for multiplexing. - this.retrySettings = null; + this.retrySettings = retrySettings; } /** diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 35e39d708f..35ebfb316c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -237,12 +237,6 @@ private StreamWriter(Builder builder) throws IOException { "Trying to enable connection pool in non-default stream."); } - if (builder.retrySettings != null) { - log.warning("Retry settings is only allowed when connection pool is not enabled."); - throw new IllegalArgumentException( - "Trying to enable connection pool while providing retry settings."); - } - // We need a client to perform some getWriteStream calls. BigQueryWriteClient client = builder.client != null ? builder.client : new BigQueryWriteClient(clientSettings); @@ -295,7 +289,8 @@ private StreamWriter(Builder builder) throws IOException { builder.limitExceededBehavior, builder.traceId, builder.compressorName, - client.getSettings()); + client.getSettings(), + builder.retrySettings); })); validateFetchedConnectonPool(builder); // If the client is not from outside, then shutdown the client we created. diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 5770f7734b..194749b633 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -23,6 +23,7 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings; import com.google.common.util.concurrent.ListeningExecutorService; @@ -58,6 +59,17 @@ public class ConnectionWorkerPoolTest { private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default"; private static final String TEST_STREAM_2 = "projects/p1/datasets/d1/tables/t2/streams/_default"; + private static final int MAX_RETRY_NUM_ATTEMPTS = 3; + private static final long INITIAL_RETRY_MILLIS = 500; + private static final double RETRY_MULTIPLIER = 1.3; + private static final int MAX_RETRY_DELAY_MINUTES = 5; + private static final RetrySettings retrySettings = + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS)) + .setRetryDelayMultiplier(RETRY_MULTIPLIER) + .setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS) + .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES)) + .build(); @Before public void setUp() throws Exception { @@ -398,6 +410,7 @@ public void testCloseExternalClient() .setWriterSchema(createProtoSchema()) .setTraceId(TEST_TRACE_ID) .setLocation("us") + .setRetrySettings(retrySettings) .build()); } @@ -483,6 +496,7 @@ ConnectionWorkerPool createConnectionWorkerPool( FlowController.LimitExceededBehavior.Block, TEST_TRACE_ID, null, - clientSettings); + clientSettings, + retrySettings); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index b053abdd5f..5de9045037 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -536,25 +536,6 @@ public void testShortenStreamNameAllowed() throws Exception { .build(); } - @Test - public void testNoRetryWhenConnectionPoolEnabled() throws Exception { - IllegalArgumentException ex = - assertThrows( - IllegalArgumentException.class, - new ThrowingRunnable() { - @Override - public void run() throws Throwable { - StreamWriter.newBuilder(TEST_STREAM_SHORTEN, client) - .setEnableConnectionPool(true) - .setRetrySettings(RetrySettings.newBuilder().build()) - .build(); - } - }); - assertTrue( - ex.getMessage() - .contains("Trying to enable connection pool while providing retry settings.")); - } - @Test public void testAppendSuccessAndConnectionError() throws Exception { StreamWriter writer = @@ -1429,6 +1410,7 @@ public StreamWriter getMultiplexingStreamWriter(String streamName) throws IOExce .setMaxInflightRequests(10) .setLocation("US") .setMaxRetryDuration(java.time.Duration.ofMillis(100)) + .setRetrySettings(retrySettings) .build(); }