Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pubsublite sink support for credentials settings #251

Merged
merged 4 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,58 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class ConnectorCredentialsProvider implements CredentialsProvider {
private static final List<String> GCP_SCOPE =
Arrays.asList("https://www.googleapis.com/auth/cloud-platform");

private static final List<String> CPS_SCOPE =
Arrays.asList("https://www.googleapis.com/auth/pubsub");
CredentialsProvider impl;

GoogleCredentials credentials;
private ConnectorCredentialsProvider(CredentialsProvider impl) {
this.impl = impl;
}

public static ConnectorCredentialsProvider fromConfig(Map<String, Object> config) {
String credentialsPath = config.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG).toString();
String credentialsJson = config.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG).toString();
if (!credentialsPath.isEmpty()) {
if (!credentialsJson.isEmpty()) {
throw new IllegalArgumentException(
"May not set both "
+ ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG
+ " and "
+ ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
}
return ConnectorCredentialsProvider.fromFile(credentialsPath);
} else if (!credentialsJson.isEmpty()) {
return ConnectorCredentialsProvider.fromJson(credentialsJson);
} else {
return ConnectorCredentialsProvider.fromDefault();
}
}

public void loadFromFile(String credentialPath) throws IOException {
this.credentials = GoogleCredentials.fromStream(new FileInputStream(credentialPath));
public static ConnectorCredentialsProvider fromFile(String credentialPath) {
return new ConnectorCredentialsProvider(
() ->
GoogleCredentials.fromStream(new FileInputStream(credentialPath))
.createScoped(GCP_SCOPE));
}

public void loadJson(String credentialsJson) throws IOException {
ByteArrayInputStream bs = new ByteArrayInputStream(credentialsJson.getBytes());
this.credentials = credentials = GoogleCredentials.fromStream(bs);
public static ConnectorCredentialsProvider fromJson(String credentialsJson) {
return new ConnectorCredentialsProvider(
() ->
GoogleCredentials.fromStream(new ByteArrayInputStream(credentialsJson.getBytes()))
.createScoped(GCP_SCOPE));
}

public static ConnectorCredentialsProvider fromDefault() {
return new ConnectorCredentialsProvider(
() -> GoogleCredentials.getApplicationDefault().createScoped(GCP_SCOPE));
}

@Override
public Credentials getCredentials() throws IOException {
if (this.credentials == null) {
return GoogleCredentials.getApplicationDefault().createScoped(this.CPS_SCOPE);
} else {
return this.credentials.createScoped(this.CPS_SCOPE);
}
return impl.getCredentials();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ public ConfigDef config() {
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
Type.STRING,
null,
"",
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
Type.STRING,
null,
"",
Importance.HIGH,
"GCP JSON credentials")
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.OrderingKeySource;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -136,24 +135,7 @@ public void start(Map<String, String> props) {
orderingKeySource =
OrderingKeySource.getEnum(
(String) validatedProps.get(CloudPubSubSinkConnector.ORDERING_KEY_SOURCE));
gcpCredentialsProvider = new ConnectorCredentialsProvider();
String credentialsPath =
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson =
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
if (credentialsPath != null) {
try {
gcpCredentialsProvider.loadFromFile(credentialsPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
gcpCredentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
gcpCredentialsProvider = ConnectorCredentialsProvider.fromConfig(validatedProps);
if (publisher == null) {
// Only do this if we did not use the constructor.
createPublisher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
import com.google.pubsub.kafka.common.ConnectorUtils;
import com.google.pubsub.v1.GetSubscriptionRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -138,25 +137,11 @@ public String version() {
public void start(Map<String, String> props) {
// Do a validation of configs here too so that we do not pass null objects to
// verifySubscription().
config().parse(props);
String cpsProject = props.get(ConnectorUtils.CPS_PROJECT_CONFIG);
String cpsSubscription = props.get(CPS_SUBSCRIPTION_CONFIG);
String credentialsPath = props.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson = props.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
ConnectorCredentialsProvider credentialsProvider = new ConnectorCredentialsProvider();
if (credentialsPath != null) {
try {
credentialsProvider.loadFromFile(credentialsPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
credentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
Map<String, Object> validated = config().parse(props);
String cpsProject = validated.get(ConnectorUtils.CPS_PROJECT_CONFIG).toString();
String cpsSubscription = validated.get(CPS_SUBSCRIPTION_CONFIG).toString();
ConnectorCredentialsProvider credentialsProvider =
ConnectorCredentialsProvider.fromConfig(validated);

verifySubscription(cpsProject, cpsSubscription, credentialsProvider);
this.props = props;
Expand Down Expand Up @@ -271,13 +256,13 @@ public ConfigDef config() {
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
Type.STRING,
null,
"",
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
Type.STRING,
null,
"",
Importance.HIGH,
"GCP JSON credentials")
.define(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -114,11 +113,6 @@ public void start(Map<String, String> props) {
useKafkaHeaders = (Boolean) validatedProps.get(CloudPubSubSourceConnector.USE_KAFKA_HEADERS);
makeOrderingKeyAttribute =
(Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_MAKE_ORDERING_KEY_ATTRIBUTE);
ConnectorCredentialsProvider gcpCredentialsProvider = new ConnectorCredentialsProvider();
String gcpCredentialsFilePath =
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG);
String credentialsJson =
(String) validatedProps.get(ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG);
boolean useStreamingPull =
(Boolean) validatedProps.get(CloudPubSubSourceConnector.CPS_STREAMING_PULL_ENABLED);
long streamingPullBytes =
Expand All @@ -136,19 +130,9 @@ public void start(Map<String, String> props) {
(Long)
validatedProps.get(
CloudPubSubSourceConnector.CPS_STREAMING_PULL_MAX_MS_PER_ACK_EXTENSION);
if (gcpCredentialsFilePath != null) {
try {
gcpCredentialsProvider.loadFromFile(gcpCredentialsFilePath);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else if (credentialsJson != null) {
try {
gcpCredentialsProvider.loadJson(credentialsJson);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
ConnectorCredentialsProvider gcpCredentialsProvider =
ConnectorCredentialsProvider.fromConfig(validatedProps);

// Only do this if we did not set it through the constructor.
if (subscriber == null) {
if (useStreamingPull) {
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/com/google/pubsublite/kafka/sink/ConfigDefs.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.pubsublite.kafka.sink;

import com.google.pubsub.kafka.common.ConnectorUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;

Expand Down Expand Up @@ -49,6 +50,18 @@ static ConfigDef config() {
ConfigDef.Type.STRING,
OrderingMode.DEFAULT.name(),
Importance.HIGH,
"The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.");
"The ordering mode to use for publishing to Pub/Sub Lite. If set to `KAFKA`, messages will be republished to the same partition index they were read from on the source topic. Note that this means the Pub/Sub Lite topic *must* have the same number of partitions as the source Kafka topic.")
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"GCP JSON credentials");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,21 @@
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import com.google.pubsub.kafka.common.ConnectorCredentialsProvider;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigValue;

class PublisherFactoryImpl implements PublisherFactory {

private static final Framework FRAMEWORK = Framework.of("KAFKA_CONNECT");

private PartitionPublisherFactory getPartitionPublisherFactory(TopicPath topic) {
private PartitionPublisherFactory getPartitionPublisherFactory(
TopicPath topic, ConnectorCredentialsProvider credentialsProvider) {

return new PartitionPublisherFactory() {
private Optional<PublisherServiceClient> publisherServiceClient = Optional.empty();
Expand All @@ -61,9 +65,7 @@ private synchronized PublisherServiceClient getServiceClient() throws ApiExcepti
addDefaultSettings(
topic.location().extractRegion(),
PublisherServiceSettings.newBuilder()
.setCredentialsProvider(
PublisherServiceSettings.defaultCredentialsProviderBuilder()
.build()))));
.setCredentialsProvider(credentialsProvider))));
return publisherServiceClient.get();
} catch (Throwable t) {
throw toCanonical(t).underlying;
Expand Down Expand Up @@ -95,25 +97,38 @@ public void close() {}

@Override
public Publisher<MessageMetadata> newPublisher(Map<String, String> params) {
Map<String, ConfigValue> config = ConfigDefs.config().validateAll(params);
Map<String, Object> config = ConfigDefs.config().parse(params);
ConnectorCredentialsProvider credentialsProvider =
ConnectorCredentialsProvider.fromConfig(config);
CloudRegionOrZone location =
CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).value().toString());
CloudRegionOrZone.parse(config.get(ConfigDefs.LOCATION_FLAG).toString());
PartitionCountWatchingPublisherSettings.Builder builder =
PartitionCountWatchingPublisherSettings.newBuilder();
TopicPath topic =
TopicPath.newBuilder()
.setProject(
ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG).value())
.project())
ProjectPath.parse("projects/" + config.get(ConfigDefs.PROJECT_FLAG)).project())
.setLocation(location)
.setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).value().toString()))
.setName(TopicName.of(config.get(ConfigDefs.TOPIC_NAME_FLAG).toString()))
.build();
builder.setTopic(topic);
builder.setPublisherFactory(getPartitionPublisherFactory(topic));
builder.setAdminClient(
AdminClient.create(
AdminClientSettings.newBuilder().setRegion(location.extractRegion()).build()));
if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).value().toString())
builder.setPublisherFactory(getPartitionPublisherFactory(topic, credentialsProvider));
try {
builder.setAdminClient(
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(location.extractRegion())
.setServiceClient(
AdminServiceClient.create(
addDefaultSettings(
location.extractRegion(),
AdminServiceSettings.newBuilder()
.setCredentialsProvider(credentialsProvider))))
.build()));
} catch (IOException e) {
throw new IllegalStateException(e);
}
if (OrderingMode.valueOf(config.get(ConfigDefs.ORDERING_MODE_FLAG).toString())
== OrderingMode.KAFKA) {
builder.setRoutingPolicyFactory(KafkaPartitionRoutingPolicy::new);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.pubsublite.kafka.source;

import com.google.pubsub.kafka.common.ConnectorUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;

Expand Down Expand Up @@ -63,6 +64,18 @@ static ConfigDef config() {
ConfigDef.Type.LONG,
20_000_000,
Importance.MEDIUM,
"The number of outstanding bytes per-partition allowed. Set to 20MB by default.");
"The number of outstanding bytes per-partition allowed. Set to 20MB by default.")
.define(
ConnectorUtils.GCP_CREDENTIALS_FILE_PATH_CONFIG,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"The path to the GCP credentials file")
.define(
ConnectorUtils.GCP_CREDENTIALS_JSON_CONFIG,
ConfigDef.Type.STRING,
"",
Importance.HIGH,
"GCP JSON credentials");
}
}