Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topicoperator unalterable topic configs #10182

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class EntityTopicOperator extends AbstractModel implements SupportsLoggin
/* test */ static final String ENV_VAR_CRUISE_CONTROL_PORT = "STRIMZI_CRUISE_CONTROL_PORT";
/* test */ static final String ENV_VAR_CRUISE_CONTROL_SSL_ENABLED = "STRIMZI_CRUISE_CONTROL_SSL_ENABLED";
/* test */ static final String ENV_VAR_CRUISE_CONTROL_AUTH_ENABLED = "STRIMZI_CRUISE_CONTROL_AUTH_ENABLED";
/* test */ static final String ENV_VAR_TOPIC_OPERATOR_UNALTERABLE_TOPIC_CONFIG = "STRIMZI_UNALTERABLE_TOPIC_CONFIG";

/* test */ static final String THROTTLED_REPLICAS_TOPIC_CONFIGS = "follower.replication.throttled.replicas,leader.replication.throttled.replicas";

// Kafka bootstrap servers can't be specified in the JSON
/* test */ final String kafkaBootstrapServers;
Expand Down Expand Up @@ -199,6 +202,7 @@ protected List<EnvVar> getEnvVars() {
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_PORT, String.valueOf(CRUISE_CONTROL_API_PORT)));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_SSL_ENABLED, "true"));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_AUTH_ENABLED, "true"));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_TOPIC_OPERATOR_UNALTERABLE_TOPIC_CONFIG, THROTTLED_REPLICAS_TOPIC_CONFIGS));
// Truststore and API credentials are mounted in the container
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;

import static io.strimzi.operator.cluster.model.EntityTopicOperator.CRUISE_CONTROL_API_PORT;
import static io.strimzi.operator.cluster.model.EntityTopicOperator.THROTTLED_REPLICAS_TOPIC_CONFIGS;
import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties.API_TO_ADMIN_NAME;
import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties.API_TO_ADMIN_NAME_KEY;
import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties.API_TO_ADMIN_PASSWORD_KEY;
Expand Down Expand Up @@ -337,6 +338,7 @@ public void testSetupWithCruiseControlEnabled() {
expectedEnvVars.add(new EnvVarBuilder().withName(EntityTopicOperator.ENV_VAR_CRUISE_CONTROL_PORT).withValue(String.valueOf(CRUISE_CONTROL_API_PORT)).build());
expectedEnvVars.add(new EnvVarBuilder().withName(EntityTopicOperator.ENV_VAR_CRUISE_CONTROL_SSL_ENABLED).withValue(Boolean.toString(true)).build());
expectedEnvVars.add(new EnvVarBuilder().withName(EntityTopicOperator.ENV_VAR_CRUISE_CONTROL_AUTH_ENABLED).withValue(Boolean.toString(true)).build());
expectedEnvVars.add(new EnvVarBuilder().withName(EntityTopicOperator.ENV_VAR_TOPIC_OPERATOR_UNALTERABLE_TOPIC_CONFIG).withValue(THROTTLED_REPLICAS_TOPIC_CONFIGS).build());
assertThat(entityTopicOperator.getEnvVars(), is(expectedEnvVars));

Container container = entityTopicOperator.createContainer(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@

import static io.strimzi.api.kafka.model.topic.ReplicasChangeState.ONGOING;
import static io.strimzi.api.kafka.model.topic.ReplicasChangeState.PENDING;
import static io.strimzi.operator.topic.TopicOperatorConfig.ALTERABLE_TOPIC_CONFIG;
import static io.strimzi.operator.topic.TopicOperatorConfig.UNALTERABLE_TOPIC_CONFIG;
import static io.strimzi.operator.topic.TopicOperatorUtil.startReconciliationTimer;
import static io.strimzi.operator.topic.TopicOperatorUtil.stopReconciliationTimer;
import static io.strimzi.operator.topic.TopicOperatorUtil.topicNames;
Expand Down Expand Up @@ -1169,6 +1171,15 @@ private void skipNonAlterableConfigs(Set<AlterConfigOp> alterConfigOps) {
alterConfigOps.removeIf(op -> !alterablePropertySet.contains(op.configEntry().name()));
}
}

var unalterableConfigs = config.unalterableTopicConfig();
if (unalterableConfigs != null && alterConfigOps != null && !unalterableConfigs.isEmpty()) {
if (!unalterableConfigs.equalsIgnoreCase("NONE")) {
Copy link
Author

@kahartma kahartma May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consciously excluded the ALL case here, because I didn't see why anyone would want to block all configs, but could be added of course

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cluster admin wants to provide locked-up topic configs for some critical cluster (no delegation). That's a valid use case, so I would also add support for that.

Should we also change this method name to skipAlterConfigOps and update the Javadoc, as it now deals with both alterable and non-alterable configs?

var unalterablePropertySet = Arrays.stream(unalterableConfigs.replaceAll("\\s", "").split(","))
.collect(Collectors.toSet());
alterConfigOps.removeIf(op -> unalterablePropertySet.contains(op.configEntry().name()));
}
}
}

private static boolean hasConfig(KafkaTopic kt) {
Expand Down Expand Up @@ -1198,22 +1209,43 @@ private void addNonAlterableConfigsWarning(ReconcilableTopic reconcilableTopic,
List<Condition> conditions) {
var readOnlyConfigs = new ArrayList<>();
var alterableConfigs = config.alterableTopicConfig();
var unalterableConfigs = config.unalterableTopicConfig();

if (reconcilableTopic != null && reconcilableTopic.kt() != null && hasConfig(reconcilableTopic.kt())) {
if (alterableConfigs != null) {
if (alterableConfigs.equalsIgnoreCase("NONE")) {
reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> readOnlyConfigs.add(key));
} else if (!alterableConfigs.equalsIgnoreCase("ALL") && !alterableConfigs.isBlank()) {
var alterablePropertySet = Arrays.stream(alterableConfigs.replaceAll("\\s", "").split(","))
.collect(Collectors.toSet());
reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> {
if (!alterablePropertySet.contains(key)) {
readOnlyConfigs.add(key);
}
});
}
}

if (reconcilableTopic != null && reconcilableTopic.kt() != null
&& hasConfig(reconcilableTopic.kt()) && alterableConfigs != null) {
if (alterableConfigs.equalsIgnoreCase("NONE")) {
reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> readOnlyConfigs.add(key));
} else if (!alterableConfigs.equalsIgnoreCase("ALL") && !alterableConfigs.isBlank()) {
var alterablePropertySet = Arrays.stream(alterableConfigs.replaceAll("\\s", "").split(","))
.collect(Collectors.toSet());
reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> {
if (!alterablePropertySet.contains(key)) {
readOnlyConfigs.add(key);
}
});
if (unalterableConfigs != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would extract this logic in a separate method, for example addAlterableConfigWarning, and maybe rename addNonAlterableConfigsWarning to addUnalterableConfigWarning.

if (!unalterableConfigs.equalsIgnoreCase("NONE")) {
var unalterablePropertySet = Arrays.stream(unalterableConfigs.replaceAll("\\s", "").split(","))
.collect(Collectors.toSet());
reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> {
if (unalterablePropertySet.contains(key)) {
readOnlyConfigs.add(key);
}
});
}
}
}

if (!ALTERABLE_TOPIC_CONFIG.defaultValue().equals(alterableConfigs) && !UNALTERABLE_TOPIC_CONFIG.defaultValue().equals(unalterableConfigs)) {
LOGGER.warnOp("{} and {} have non-default values. {}}: {}; {}} {}",
ALTERABLE_TOPIC_CONFIG.key(), UNALTERABLE_TOPIC_CONFIG.key(),
ALTERABLE_TOPIC_CONFIG.key(), alterableConfigs,
UNALTERABLE_TOPIC_CONFIG.key(), unalterableConfigs);
}

if (!readOnlyConfigs.isEmpty()) {
var properties = String.join(", ", readOnlyConfigs.toArray(new String[0]));
var message = "These .spec.config properties are not configurable: [" + properties + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,39 +35,40 @@
/**
* Config
*
* @param namespace The namespace that the operator will watch for KafkaTopics
* @param labelSelector The label selector that KafkaTopics must match
* @param bootstrapServers The Kafka bootstrap servers
* @param clientId The client Id to use for the Admin client
* @param fullReconciliationIntervalMs The resync interval, in ms
* @param tlsEnabled Whether the Admin client should be configured to use TLS
* @param truststoreLocation The location (path) of the Admin client's truststore.
* @param truststorePassword The password for the truststore at {@code truststoreLocation}.
* @param keystoreLocation The location (path) of the Admin client's keystore.
* @param keystorePassword The password for the keystore at {@code keystoreLocation}.
* @param saslUsername, The SASL username for the Admin client
* @param saslPassword, The SASL password for the Admin client
Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How moving saslUsername and saslPassword is related to this change? Can you please revert?

* @param namespace The namespace that the operator will watch for KafkaTopics
* @param labelSelector The label selector that KafkaTopics must match
* @param bootstrapServers The Kafka bootstrap servers
* @param clientId The client Id to use for the Admin client
* @param fullReconciliationIntervalMs The resync interval, in ms
* @param tlsEnabled Whether the Admin client should be configured to use TLS
* @param truststoreLocation The location (path) of the Admin client's truststore.
* @param truststorePassword The password for the truststore at {@code truststoreLocation}.
* @param keystoreLocation The location (path) of the Admin client's keystore.
* @param keystorePassword The password for the keystore at {@code keystoreLocation}.
* @param sslEndpointIdentificationAlgorithm The SSL endpoint identification algorithm
* @param saslEnabled Whether the Admin client should be configured to use SASL
* @param saslMechanism The SASL mechanism for the Admin client
* @param saslCustomConfigJson The SASL custom values for the Admin client when using alternate auth mechanisms.
* @param saslUsername, The SASL username for the Admin client
* @param saslPassword, The SASL password for the Admin client
* @param securityProtocol The security protocol for the Admin client
* @param useFinalizer Whether to use finalizers
* @param maxQueueSize The capacity of the queue
* @param maxBatchSize The maximum size of a reconciliation batch
* @param maxBatchLingerMs The maximum time to wait for a reconciliation batch to contain {@code maxBatchSize} items.
* @param enableAdditionalMetrics Whether to enable additional metrics
* @param cruiseControlEnabled Whether Cruise Control integration is enabled
* @param cruiseControlRackEnabled Whether the target Kafka cluster has rack awareness
* @param cruiseControlHostname Cruise Control hostname
* @param cruiseControlPort Cruise Control port
* @param cruiseControlSslEnabled Whether Cruise Control SSL encryption is enabled
* @param cruiseControlAuthEnabled Whether Cruise Control Basic authentication is enabled
* @param cruiseControlCrtFilePath Certificate chain to be trusted
* @param cruiseControlApiUserPath Api admin username file path
* @param cruiseControlApiPassPath Api admin password file path
* @param alterableTopicConfig Comma separated list of the alterable Kafka topic properties
* @param skipClusterConfigReview For some managed Kafka services the Cluster config is not callable, so this skips those calls.
* @param saslEnabled Whether the Admin client should be configured to use SASL
* @param saslMechanism The SASL mechanism for the Admin client
* @param saslCustomConfigJson The SASL custom values for the Admin client when using alternate auth mechanisms.
* @param securityProtocol The security protocol for the Admin client
* @param useFinalizer Whether to use finalizers
* @param maxQueueSize The capacity of the queue
* @param maxBatchSize The maximum size of a reconciliation batch
* @param maxBatchLingerMs The maximum time to wait for a reconciliation batch to contain {@code maxBatchSize} items.
* @param enableAdditionalMetrics Whether to enable additional metrics
* @param cruiseControlEnabled Whether Cruise Control integration is enabled
* @param cruiseControlRackEnabled Whether the target Kafka cluster has rack awareness
* @param cruiseControlHostname Cruise Control hostname
* @param cruiseControlPort Cruise Control port
* @param cruiseControlSslEnabled Whether Cruise Control SSL encryption is enabled
* @param cruiseControlAuthEnabled Whether Cruise Control Basic authentication is enabled
* @param cruiseControlCrtFilePath Certificate chain to be trusted
* @param cruiseControlApiUserPath Api admin username file path
* @param cruiseControlApiPassPath Api admin password file path
* @param alterableTopicConfig Comma separated list of Kafka topic configurations that are reconciled
* @param unalterableTopicConfig Comma separated list of Kafka topic configurations that are ignored
* @param skipClusterConfigReview For some managed Kafka services the Cluster config is not callable, so this skips those calls.
*/
public record TopicOperatorConfig(
String namespace,
Expand Down Expand Up @@ -103,6 +104,7 @@ public record TopicOperatorConfig(
String cruiseControlApiUserPath,
String cruiseControlApiPassPath,
String alterableTopicConfig,
String unalterableTopicConfig,
boolean skipClusterConfigReview
) {
private final static ReconciliationLogger LOGGER = ReconciliationLogger.create(TopicOperatorConfig.class);
Expand Down Expand Up @@ -134,6 +136,7 @@ public record TopicOperatorConfig(
static final ConfigParameter<Long> MAX_BATCH_LINGER_MS = new ConfigParameter<>("STRIMZI_MAX_BATCH_LINGER_MS", strictlyPositive(LONG), "100", CONFIG_VALUES);
static final ConfigParameter<Boolean> ENABLE_ADDITIONAL_METRICS = new ConfigParameter<>("STRIMZI_ENABLE_ADDITIONAL_METRICS", BOOLEAN, "false", CONFIG_VALUES);
static final ConfigParameter<String> ALTERABLE_TOPIC_CONFIG = new ConfigParameter<>("STRIMZI_ALTERABLE_TOPIC_CONFIG", STRING, "ALL", CONFIG_VALUES);
static final ConfigParameter<String> UNALTERABLE_TOPIC_CONFIG = new ConfigParameter<>("STRIMZI_UNALTERABLE_TOPIC_CONFIG", STRING, "NONE", CONFIG_VALUES);
Comment on lines 138 to +139
Copy link
Contributor

@fvaleri fvaleri May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to provide some Javadoc. For example:

    /**
     * Use {@code alterableTopicConfig} to specify a comma separated list (allow list) of Kafka topic configurations that are reconciled, everything else is ignored.
     * The default value is "ALL", which means that all configurations are available; the opposite is "NONE", which means that all configurations are ignored.
     * <br/><br/>
     * This is useful in standalone mode when you have a Kafka service that restricts alter operations to a subset of Kafka topic configurations.
     */
    static final ConfigParameter<String> ALTERABLE_TOPIC_CONFIG = new ConfigParameter<>("STRIMZI_ALTERABLE_TOPIC_CONFIG", STRING, "ALL", CONFIG_VALUES);

    /**
     * Use {@code unalterableTopicConfig} to specify a comma separated list (deny list) of Kafka topic configurations that are ignored, everything else is reconciled.
     * The default value is "NONE", which means that all configurations are available; the opposite is "ALL", which means that all configurations are ignored.
     * <br/><br/>
     * This is useful when another application changes dynamic topic configurations directly in Kafka, and the operator should not interfere.
     */
    static final ConfigParameter<String> UNALTERABLE_TOPIC_CONFIG = new ConfigParameter<>("STRIMZI_UNALTERABLE_TOPIC_CONFIG", STRING, "NONE", CONFIG_VALUES);```

static final ConfigParameter<Boolean> SKIP_CLUSTER_CONFIG_REVIEW = new ConfigParameter<>("STRIMZI_SKIP_CLUSTER_CONFIG_REVIEW", BOOLEAN, "false", CONFIG_VALUES);
static final ConfigParameter<FeatureGates> FEATURE_GATES = new ConfigParameter<>("STRIMZI_FEATURE_GATES", parseFeatureGates(), "", CONFIG_VALUES);

Expand Down Expand Up @@ -212,6 +215,7 @@ public TopicOperatorConfig(Map<String, Object> map) {
get(map, CRUISE_CONTROL_API_USER_PATH),
get(map, CRUISE_CONTROL_API_PASS_PATH),
get(map, ALTERABLE_TOPIC_CONFIG),
get(map, UNALTERABLE_TOPIC_CONFIG),
get(map, SKIP_CLUSTER_CONFIG_REVIEW)
);
}
Expand Down Expand Up @@ -356,6 +360,7 @@ public String toString() {
"\n\tsaslMechanism='" + saslMechanism + '\'' +
"\n\tsaslCustomConfigJson='" + (saslCustomConfigJson == null ? null : mask) + '\'' +
"\n\talterableTopicConfig='" + alterableTopicConfig + '\'' +
"\n\tunalterableTopicConfig='" + unalterableTopicConfig + '\'' +
"\n\tskipClusterConfigReview='" + skipClusterConfigReview + '\'' +
"\n\tsaslUsername='" + saslUsername + '\'' +
"\n\tsaslPassword='" + mask + '\'' +
Expand Down
Loading