diff --git a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java index 4c36a2a..6fccf7f 100644 --- a/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java +++ b/src/main/java/com/google/pubsub/kafka/common/ConnectorCredentialsProvider.java @@ -23,29 +23,58 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; public class ConnectorCredentialsProvider implements CredentialsProvider { + private static final List GCP_SCOPE = + Arrays.asList("https://www.googleapis.com/auth/cloud-platform"); - private static final List CPS_SCOPE = - Arrays.asList("https://www.googleapis.com/auth/pubsub"); + CredentialsProvider impl; - GoogleCredentials credentials; + private ConnectorCredentialsProvider(CredentialsProvider impl) { + this.impl = impl; + } + + public static ConnectorCredentialsProvider fromConfig(Map config) { + String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString(); + String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString(); + if (!credentialsPath.isEmpty()) { + if (!credentialsJson.isEmpty()) { + throw new IllegalArgumentException( + "May not set both " + + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG + + " and " + + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); + } + return ConnectorCredentialsProvider.fromFile(credentialsPath); + } else if (!credentialsJson.isEmpty()) { + return ConnectorCredentialsProvider.fromJson(credentialsJson); + } else { + return ConnectorCredentialsProvider.fromDefault(); + } + } - public void loadFromFile(String credentialPath) throws IOException { - this.credentials = GoogleCredentials.fromStream(new FileInputStream(credentialPath)); + public static ConnectorCredentialsProvider fromFile(String credentialPath) { + return new ConnectorCredentialsProvider( + () -> + GoogleCredentials.fromStream(new FileInputStream(credentialPath)) + .createScoped(GCP_SCOPE)); } - public void loadJson(String credentialsJson) throws IOException { - ByteArrayInputStream bs = new ByteArrayInputStream(credentialsJson.getBytes()); - this.credentials = credentials = GoogleCredentials.fromStream(bs); + public static ConnectorCredentialsProvider fromJson(String credentialsJson) { + return new ConnectorCredentialsProvider( + () -> + GoogleCredentials.fromStream(new ByteArrayInputStream(credentialsJson.getBytes())) + .createScoped(GCP_SCOPE)); + } + + public static ConnectorCredentialsProvider fromDefault() { + return new ConnectorCredentialsProvider( + () -> GoogleCredentials.getApplicationDefault().createScoped(GCP_SCOPE)); } @Override public Credentials getCredentials() throws IOException { - if (this.credentials == null) { - return GoogleCredentials.getApplicationDefault().createScoped(this.CPS_SCOPE); - } else { - return this.credentials.createScoped(this.CPS_SCOPE); - } + return impl.getCredentials(); } } diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java index 7b083d4..eb7d681 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkConnector.java @@ -233,13 +233,13 @@ public ConfigDef config() { .define( ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG, Type.STRING, - null, + "", Importance.HIGH, "The path to the GCP credentials file") .define( ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG, Type.STRING, - null, + "", Importance.HIGH, "GCP JSON credentials") .define( diff --git a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index 5c31a9d..9faa791 100644 --- a/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -33,7 +33,6 @@ import com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.OrderingKeySource; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -136,24 +135,7 @@ public void start(Map props) { orderingKeySource = OrderingKeySource.getEnum( (String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE)); - gcpCredentialsProvider = new ConnectorCredentialsProvider(); - String credentialsPath = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG); - String credentialsJson = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); - if (credentialsPath != null) { - try { - gcpCredentialsProvider.loadFromFile(credentialsPath); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (credentialsJson != null) { - try { - gcpCredentialsProvider.loadJson(credentialsJson); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(validatedProps); if (publisher == null) { // Only do this if we did not use the constructor. createPublisher(); diff --git a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java index 4fd649c..3f507d2 100644 --- a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java +++ b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java @@ -22,7 +22,6 @@ import com.google.pubsub.kafka.common.ConnectorCredentialsProvider; import com.google.pubsub.kafka.common.ConnectorUtils; import com.google.pubsub.v1.GetSubscriptionRequest; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -138,25 +137,11 @@ public String version() { public void start(Map props) { // Do a validation of configs here too so that we do not pass null objects to // verifySubscription(). - config().parse(props); - String cpsProject = props.get(ConnectorUtils.CPS_PROJECT_CONFIG); - String cpsSubscription = props.get(CPS_SUBSCRIPTION_CONFIG); - String credentialsPath = props.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG); - String credentialsJson = props.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); - ConnectorCredentialsProvider credentialsProvider = new ConnectorCredentialsProvider(); - if (credentialsPath != null) { - try { - credentialsProvider.loadFromFile(credentialsPath); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (credentialsJson != null) { - try { - credentialsProvider.loadJson(credentialsJson); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + Map validated = config().parse(props); + String cpsProject = validated.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString(); + String cpsSubscription = validated.get(CPS_SUBSCRIPTION_CONFIG).toString(); + ConnectorCredentialsProvider credentialsProvider = + ConnectorCredentialsProvider.fromConfig(validated); verifySubscription(cpsProject, cpsSubscription, credentialsProvider); this.props = props; @@ -271,13 +256,13 @@ public ConfigDef config() { .define( ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG, Type.STRING, - null, + "", Importance.HIGH, "The path to the GCP credentials file") .define( ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG, Type.STRING, - null, + "", Importance.HIGH, "GCP JSON credentials") .define( diff --git a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java index 7319cea..39a8f2a 100644 --- a/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java +++ b/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java @@ -34,7 +34,6 @@ import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -114,11 +113,6 @@ public void start(Map props) { useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS); makeOrderingKeyAttribute = (Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_MAKE_ORDERING_KEY_ATTRIBUTE); - ConnectorCredentialsProvider gcpCredentialsProvider = new ConnectorCredentialsProvider(); - String gcpCredentialsFilePath = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG); - String credentialsJson = - (String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG); boolean useStreamingPull = (Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_ENABLED); long streamingPullBytes = @@ -136,19 +130,9 @@ public void start(Map props) { (Long) validatedProps.get( CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION); - if (gcpCredentialsFilePath != null) { - try { - gcpCredentialsProvider.loadFromFile(gcpCredentialsFilePath); - } catch (IOException e) { - throw new RuntimeException(e); - } - } else if (credentialsJson != null) { - try { - gcpCredentialsProvider.loadJson(credentialsJson); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + ConnectorCredentialsProvider gcpCredentialsProvider = + ConnectorCredentialsProvider.fromConfig(validatedProps); + // Only do this if we did not set it through the constructor. if (subscriber == null) { if (useStreamingPull) { diff --git a/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java b/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java index 932c20e..12d59eb 100644 --- a/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java +++ b/src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java @@ -15,6 +15,7 @@ */ package com.google.pubsublite.kafka.sink; +import com.google.pubsub.kafka.common.ConnectorUtils; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -49,6 +50,18 @@ static ConfigDef config() { ConfigDef.Type.STRING, OrderingMode.DEFAULT.name(), Importance.HIGH, - "The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic."); + "The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.") + .define( + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "The path to the GCP credentials file") + .define( + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "GCP JSON credentials"); } } diff --git a/src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java b/src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java index 47b93fa..09c62b2 100644 --- a/src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java +++ b/src/main/java/com/google/pubsublite/kafka/sink/PublisherFactoryImpl.java @@ -37,17 +37,21 @@ import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; +import com.google.cloud.pubsublite.v1.AdminServiceClient; +import com.google.cloud.pubsublite.v1.AdminServiceSettings; import com.google.cloud.pubsublite.v1.PublisherServiceClient; import com.google.cloud.pubsublite.v1.PublisherServiceSettings; +import com.google.pubsub.kafka.common.ConnectorCredentialsProvider; +import java.io.IOException; import java.util.Map; import java.util.Optional; -import org.apache.kafka.common.config.ConfigValue; class PublisherFactoryImpl implements PublisherFactory { private static final Framework FRAMEWORK = Framework.of("KAFKA_CONNECT"); - private PartitionPublisherFactory getPartitionPublisherFactory(TopicPath topic) { + private PartitionPublisherFactory getPartitionPublisherFactory( + TopicPath topic, ConnectorCredentialsProvider credentialsProvider) { return new PartitionPublisherFactory() { private Optional publisherServiceClient = Optional.empty(); @@ -61,9 +65,7 @@ private synchronized PublisherServiceClient getServiceClient() throws ApiExcepti addDefaultSettings( topic.location().extractRegion(), PublisherServiceSettings.newBuilder() - .setCredentialsProvider( - PublisherServiceSettings.defaultCredentialsProviderBuilder() - .build())))); + .setCredentialsProvider(credentialsProvider)))); return publisherServiceClient.get(); } catch (Throwable t) { throw toCanonical(t).underlying; @@ -95,25 +97,38 @@ public void close() {} @Override public Publisher newPublisher(Map params) { - Map config = ConfigDefs.config().validateAll(params); + Map config = ConfigDefs.config().parse(params); + ConnectorCredentialsProvider credentialsProvider = + ConnectorCredentialsProvider.fromConfig(config); CloudRegionOrZone location = - CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString()); + CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).toString()); PartitionCountWatchingPublisherSettings.Builder builder = PartitionCountWatchingPublisherSettings.newBuilder(); TopicPath topic = TopicPath.newBuilder() .setProject( - ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value()) - .project()) + ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG)).project()) .setLocation(location) - .setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).value().toString())) + .setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).toString())) .build(); builder.setTopic(topic); - builder.setPublisherFactory(getPartitionPublisherFactory(topic)); - builder.setAdminClient( - AdminClient.create( - AdminClientSettings.newBuilder().setRegion(location.extractRegion()).build())); - if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).value().toString()) + builder.setPublisherFactory(getPartitionPublisherFactory(topic, credentialsProvider)); + try { + builder.setAdminClient( + AdminClient.create( + AdminClientSettings.newBuilder() + .setRegion(location.extractRegion()) + .setServiceClient( + AdminServiceClient.create( + addDefaultSettings( + location.extractRegion(), + AdminServiceSettings.newBuilder() + .setCredentialsProvider(credentialsProvider)))) + .build())); + } catch (IOException e) { + throw new IllegalStateException(e); + } + if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).toString()) == OrderingMode.KAFKA) { builder.setRoutingPolicyFactory(KafkaPartitionRoutingPolicy::new); } diff --git a/src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java b/src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java index 2dbf1d3..3694266 100644 --- a/src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java +++ b/src/main/java/com/google/pubsublite/kafka/source/ConfigDefs.java @@ -15,6 +15,7 @@ */ package com.google.pubsublite.kafka.source; +import com.google.pubsub.kafka.common.ConnectorUtils; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -63,6 +64,18 @@ static ConfigDef config() { ConfigDef.Type.LONG, 20_000_000, Importance.MEDIUM, - "The number of outstanding bytes per-partition allowed. Set to 20MB by default."); + "The number of outstanding bytes per-partition allowed. Set to 20MB by default.") + .define( + ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "The path to the GCP credentials file") + .define( + ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG, + ConfigDef.Type.STRING, + "", + Importance.HIGH, + "GCP JSON credentials"); } }