From dd7216dcd3bb1cee1ca7a3e4ba7443354b8d692b Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 31 Mar 2023 14:15:38 -0400 Subject: [PATCH 1/4] feat: add pubsublite sink support for credentials settings psl source support requires more work also unify credentialsProvider creation --- .../common/ConnectorCredentialsProvider.java | 64 +++++++++++++++---- .../kafka/sink/CloudPubSubSinkConnector.java | 4 +- .../kafka/sink/CloudPubSubSinkTask.java | 20 +----- .../source/CloudPubSubSourceConnector.java | 29 ++------- .../kafka/source/CloudPubSubSourceTask.java | 32 +++++----- .../pubsublite/kafka/sink/ConfigDefs.java | 15 ++++- .../kafka/sink/PublisherFactoryImpl.java | 45 ++++++++----- .../pubsublite/kafka/source/ConfigDefs.java | 15 ++++- 8 files changed, 134 insertions(+), 90 deletions(-) diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java index 4c36a2a..16ccd62 100644 --- a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java +++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java @@ -23,29 +23,65 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; public class ConnectorCredentialsProvider implements CredentialsProvider { - - private static final List CPS_SCOPE = - Arrays.asList("https://www.googleapis.com/auth/pubsub"); + private static final List GCP_SCOPE = + Arrays.asList("https://www.googleapis.com/auth/cloud-platform"); GoogleCredentials credentials; - public void loadFromFile(String credentialPath) throws IOException { - this.credentials = GoogleCredentials.fromStream(new FileInputStream(credentialPath)); + private ConnectorCredentialsProvider(GoogleCredentials credentials) { + this.credentials = credentials.createScoped(GCP_SCOPE); } - public void loadJson(String credentialsJson) throws IOException { - ByteArrayInputStream bs = new ByteArrayInputStream(credentialsJson.getBytes()); - this.credentials = credentials = GoogleCredentials.fromStream(bs); + public static ConnectorCredentialsProvider fromConfig(Map config) { + String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString(); + String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString(); + if (!credentialsPath.isEmpty()) { + if (!credentialsJson.isEmpty()) { + throw new IllegalArgumentException( + "May not set both " + + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG + + " and " + + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); + } + return ConnectorCredentialsProvider.fromFile(credentialsPath); + } else if (credentialsJson != null) { + return ConnectorCredentialsProvider.fromJson(credentialsJson); + } else { + return ConnectorCredentialsProvider.fromDefault(); + } } - @Override - public Credentials getCredentials() throws IOException { - if (this.credentials == null) { - return GoogleCredentials.getApplicationDefault().createScoped(this.CPS_SCOPE); - } else { - return this.credentials.createScoped(this.CPS_SCOPE); + public static ConnectorCredentialsProvider fromFile(String credentialPath) { + try { + return new ConnectorCredentialsProvider( + GoogleCredentials.fromStream(new FileInputStream(credentialPath))); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to load credentials.", e); + } + } + + public static ConnectorCredentialsProvider fromJson(String credentialsJson) { + try { + return new ConnectorCredentialsProvider( + GoogleCredentials.fromStream(new ByteArrayInputStream(credentialsJson.getBytes()))); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to load credentials.", e); + } + } + + public static ConnectorCredentialsProvider fromDefault() { + try { + return new ConnectorCredentialsProvider(GoogleCredentials.getApplicationDefault()); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to load credentials.", e); } } + + @Override + public Credentials getCredentials() { + return credentials; + } } diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java index 7b083d4..eb7d681 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java @@ -233,13 +233,13 @@ public ConfigDef config() { .define( ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG, Type.STRING, - null, + "", Importance.HIGH, "The path to the GCP credentials file") .define( ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG, Type.STRING, - null, + "", Importance.HIGH, "GCP JSON credentials") .define( diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index 5c31a9d..9faa791 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -33,7 +33,6 @@ import com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.OrderingKeySource; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -136,24 +135,7 @@ public void start(Map props) { orderingKeySource = OrderingKeySource.getEnum( (String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE)); - gcpCredentialsProvider = new ConnectorCredentialsProvider(); - String credentialsPath = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG); - String credentialsJson = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); - if (credentialsPath != null) { - try { - gcpCredentialsProvider.loadFromFile(credentialsPath); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (credentialsJson != null) { - try { - gcpCredentialsProvider.loadJson(credentialsJson); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(validatedProps); if (publisher == null) { // Only do this if we did not use the constructor. createPublisher(); diff --git a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java index 4fd649c..3f507d2 100644 --- a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java +++ b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java @@ -22,7 +22,6 @@ import com.google.pubsub.kafka.common.ConnectorCredentialsProvider; import com.google.pubsub.kafka.common.ConnectorUtils; import com.google.pubsub.v1.GetSubscriptionRequest; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -138,25 +137,11 @@ public String version() { public void start(Map props) { // Do a validation of configs here too so that we do not pass null objects to // verifySubscription(). - config().parse(props); - String cpsProject = props.get(ConnectorUtils.CPS_PROJECT_CONFIG); - String cpsSubscription = props.get(CPS_SUBSCRIPTION_CONFIG); - String credentialsPath = props.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG); - String credentialsJson = props.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); - ConnectorCredentialsProvider credentialsProvider = new ConnectorCredentialsProvider(); - if (credentialsPath != null) { - try { - credentialsProvider.loadFromFile(credentialsPath); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (credentialsJson != null) { - try { - credentialsProvider.loadJson(credentialsJson); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + Map validated = config().parse(props); + String cpsProject = validated.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString(); + String cpsSubscription = validated.get(CPS_SUBSCRIPTION_CONFIG).toString(); + ConnectorCredentialsProvider credentialsProvider = + ConnectorCredentialsProvider.fromConfig(validated); verifySubscription(cpsProject, cpsSubscription, credentialsProvider); this.props = props; @@ -271,13 +256,13 @@ public ConfigDef config() { .define( ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG, Type.STRING, - null, + "", Importance.HIGH, "The path to the GCP credentials file") .define( ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG, Type.STRING, - null, + "", Importance.HIGH, "GCP JSON credentials") .define( diff --git a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java index 7319cea..ad4f374 100644 --- a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java +++ b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java @@ -34,7 +34,6 @@ import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -114,11 +113,6 @@ public void start(Map props) { useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS); makeOrderingKeyAttribute = (Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_MAKE_ORDERING_KEY_ATTRIBUTE); - ConnectorCredentialsProvider gcpCredentialsProvider = new ConnectorCredentialsProvider(); - String gcpCredentialsFilePath = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG); - String credentialsJson = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); boolean useStreamingPull = (Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_ENABLED); long streamingPullBytes = @@ -136,18 +130,24 @@ public void start(Map props) { (Long) validatedProps.get( CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION); - if (gcpCredentialsFilePath != null) { - try { - gcpCredentialsProvider.loadFromFile(gcpCredentialsFilePath); - } catch (IOException e) { - throw new RuntimeException(e); + String credentialsPath = + (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG); + String credentialsJson = + (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); + ConnectorCredentialsProvider gcpCredentialsProvider; + if (credentialsPath != null) { + if (credentialsJson != null) { + throw new IllegalArgumentException( + "May not set both " + + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG + + " and " + + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); } + gcpCredentialsProvider = ConnectorCredentialsProvider.fromFile(credentialsPath); } else if (credentialsJson != null) { - try { - gcpCredentialsProvider.loadJson(credentialsJson); - } catch (IOException e) { - throw new RuntimeException(e); - } + gcpCredentialsProvider = ConnectorCredentialsProvider.fromJson(credentialsJson); + } else { + gcpCredentialsProvider = ConnectorCredentialsProvider.fromDefault(); } // Only do this if we did not set it through the constructor. if (subscriber == null) { diff --git a/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java b/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java index 932c20e..12d59eb 100644 --- a/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java +++ b/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java @@ -15,6 +15,7 @@ */ package com.google.pubsublite.kafka.sink; +import com.google.pubsub.kafka.common.ConnectorUtils; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -49,6 +50,18 @@ static ConfigDef config() { ConfigDef.Type.STRING, OrderingMode.DEFAULT.name(), Importance.HIGH, - "The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic."); + "The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.") + .define( + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "The path to the GCP credentials file") + .define( + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "GCP JSON credentials"); } } diff --git a/src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java b/src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java index 47b93fa..09c62b2 100644 --- a/src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java +++ b/src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java @@ -37,17 +37,21 @@ import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; +import com.google.cloud.pubsublite.v1.AdminServiceClient; +import com.google.cloud.pubsublite.v1.AdminServiceSettings; import com.google.cloud.pubsublite.v1.PublisherServiceClient; import com.google.cloud.pubsublite.v1.PublisherServiceSettings; +import com.google.pubsub.kafka.common.ConnectorCredentialsProvider; +import java.io.IOException; import java.util.Map; import java.util.Optional; -import org.apache.kafka.common.config.ConfigValue; class PublisherFactoryImpl implements PublisherFactory { private static final Framework FRAMEWORK = Framework.of("KAFKA_CONNECT"); - private PartitionPublisherFactory getPartitionPublisherFactory(TopicPath topic) { + private PartitionPublisherFactory getPartitionPublisherFactory( + TopicPath topic, ConnectorCredentialsProvider credentialsProvider) { return new PartitionPublisherFactory() { private Optional publisherServiceClient = Optional.empty(); @@ -61,9 +65,7 @@ private synchronized PublisherServiceClient getServiceClient() throws ApiExcepti addDefaultSettings( topic.location().extractRegion(), PublisherServiceSettings.newBuilder() - .setCredentialsProvider( - PublisherServiceSettings.defaultCredentialsProviderBuilder() - .build())))); + .setCredentialsProvider(credentialsProvider)))); return publisherServiceClient.get(); } catch (Throwable t) { throw toCanonical(t).underlying; @@ -95,25 +97,38 @@ public void close() {} @Override public Publisher newPublisher(Map params) { - Map config = ConfigDefs.config().validateAll(params); + Map config = ConfigDefs.config().parse(params); + ConnectorCredentialsProvider credentialsProvider = + ConnectorCredentialsProvider.fromConfig(config); CloudRegionOrZone location = - CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString()); + CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).toString()); PartitionCountWatchingPublisherSettings.Builder builder = PartitionCountWatchingPublisherSettings.newBuilder(); TopicPath topic = TopicPath.newBuilder() .setProject( - ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value()) - .project()) + ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG)).project()) .setLocation(location) - .setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).value().toString())) + .setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).toString())) .build(); builder.setTopic(topic); - builder.setPublisherFactory(getPartitionPublisherFactory(topic)); - builder.setAdminClient( - AdminClient.create( - AdminClientSettings.newBuilder().setRegion(location.extractRegion()).build())); - if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).value().toString()) + builder.setPublisherFactory(getPartitionPublisherFactory(topic, credentialsProvider)); + try { + builder.setAdminClient( + AdminClient.create( + AdminClientSettings.newBuilder() + .setRegion(location.extractRegion()) + .setServiceClient( + AdminServiceClient.create( + addDefaultSettings( + location.extractRegion(), + AdminServiceSettings.newBuilder() + .setCredentialsProvider(credentialsProvider)))) + .build())); + } catch (IOException e) { + throw new IllegalStateException(e); + } + if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).toString()) == OrderingMode.KAFKA) { builder.setRoutingPolicyFactory(KafkaPartitionRoutingPolicy::new); } diff --git a/src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java b/src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java index 2dbf1d3..3694266 100644 --- a/src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java +++ b/src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java @@ -15,6 +15,7 @@ */ package com.google.pubsublite.kafka.source; +import com.google.pubsub.kafka.common.ConnectorUtils; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -63,6 +64,18 @@ static ConfigDef config() { ConfigDef.Type.LONG, 20_000_000, Importance.MEDIUM, - "The number of outstanding bytes per-partition allowed. Set to 20MB by default."); + "The number of outstanding bytes per-partition allowed. Set to 20MB by default.") + .define( + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "The path to the GCP credentials file") + .define( + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "GCP JSON credentials"); } } From 827004f79ba9a979b0ef97897bc1b3a7fe9b8afd Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 31 Mar 2023 14:23:23 -0400 Subject: [PATCH 2/4] feat: add pubsublite sink support for credentials settings psl source support requires more work also unify credentialsProvider creation --- .../kafka/source/CloudPubSubSourceTask.java | 22 +++---------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java index ad4f374..39a8f2a 100644 --- a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java +++ b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java @@ -130,25 +130,9 @@ public void start(Map props) { (Long) validatedProps.get( CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION); - String credentialsPath = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG); - String credentialsJson = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); - ConnectorCredentialsProvider gcpCredentialsProvider; - if (credentialsPath != null) { - if (credentialsJson != null) { - throw new IllegalArgumentException( - "May not set both " - + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG - + " and " - + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); - } - gcpCredentialsProvider = ConnectorCredentialsProvider.fromFile(credentialsPath); - } else if (credentialsJson != null) { - gcpCredentialsProvider = ConnectorCredentialsProvider.fromJson(credentialsJson); - } else { - gcpCredentialsProvider = ConnectorCredentialsProvider.fromDefault(); - } + ConnectorCredentialsProvider gcpCredentialsProvider = + ConnectorCredentialsProvider.fromConfig(validatedProps); + // Only do this if we did not set it through the constructor. if (subscriber == null) { if (useStreamingPull) { From 2aaa746a04323949f10858142c3ae8fb2e9a09da Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 31 Mar 2023 14:27:36 -0400 Subject: [PATCH 3/4] feat: add pubsublite sink support for credentials settings psl source support requires more work also unify credentialsProvider creation --- .../pubsub/kafka/common/ConnectorCredentialsProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java index 16ccd62..0332c37 100644 --- a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java +++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java @@ -47,7 +47,7 @@ public static ConnectorCredentialsProvider fromConfig(Map config + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); } return ConnectorCredentialsProvider.fromFile(credentialsPath); - } else if (credentialsJson != null) { + } else if (!credentialsJson.isEmpty()) { return ConnectorCredentialsProvider.fromJson(credentialsJson); } else { return ConnectorCredentialsProvider.fromDefault(); From 88fb57615297d72373807a8f60f5f1685f7d40c5 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 31 Mar 2023 14:37:24 -0400 Subject: [PATCH 4/4] feat: add pubsublite sink support for credentials settings psl source support requires more work also unify credentialsProvider creation --- .../common/ConnectorCredentialsProvider.java | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java index 0332c37..6fccf7f 100644 --- a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java +++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java @@ -29,10 +29,10 @@ public class ConnectorCredentialsProvider implements CredentialsProvider { private static final List GCP_SCOPE = Arrays.asList("https://www.googleapis.com/auth/cloud-platform"); - GoogleCredentials credentials; + CredentialsProvider impl; - private ConnectorCredentialsProvider(GoogleCredentials credentials) { - this.credentials = credentials.createScoped(GCP_SCOPE); + private ConnectorCredentialsProvider(CredentialsProvider impl) { + this.impl = impl; } public static ConnectorCredentialsProvider fromConfig(Map config) { @@ -55,33 +55,26 @@ public static ConnectorCredentialsProvider fromConfig(Map config } public static ConnectorCredentialsProvider fromFile(String credentialPath) { - try { - return new ConnectorCredentialsProvider( - GoogleCredentials.fromStream(new FileInputStream(credentialPath))); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to load credentials.", e); - } + return new ConnectorCredentialsProvider( + () -> + GoogleCredentials.fromStream(new FileInputStream(credentialPath)) + .createScoped(GCP_SCOPE)); } public static ConnectorCredentialsProvider fromJson(String credentialsJson) { - try { - return new ConnectorCredentialsProvider( - GoogleCredentials.fromStream(new ByteArrayInputStream(credentialsJson.getBytes()))); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to load credentials.", e); - } + return new ConnectorCredentialsProvider( + () -> + GoogleCredentials.fromStream(new ByteArrayInputStream(credentialsJson.getBytes())) + .createScoped(GCP_SCOPE)); } public static ConnectorCredentialsProvider fromDefault() { - try { - return new ConnectorCredentialsProvider(GoogleCredentials.getApplicationDefault()); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to load credentials.", e); - } + return new ConnectorCredentialsProvider( + () -> GoogleCredentials.getApplicationDefault().createScoped(GCP_SCOPE)); } @Override - public Credentials getCredentials() { - return credentials; + public Credentials getCredentials() throws IOException { + return impl.getCredentials(); } }