diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/EntityTopicOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/EntityTopicOperator.java index 1ccaca88da5..7045caacd19 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/EntityTopicOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/EntityTopicOperator.java @@ -78,6 +78,9 @@ public class EntityTopicOperator extends AbstractModel implements SupportsLoggin /* test */ static final String ENV_VAR_CRUISE_CONTROL_PORT = "STRIMZI_CRUISE_CONTROL_PORT"; /* test */ static final String ENV_VAR_CRUISE_CONTROL_SSL_ENABLED = "STRIMZI_CRUISE_CONTROL_SSL_ENABLED"; /* test */ static final String ENV_VAR_CRUISE_CONTROL_AUTH_ENABLED = "STRIMZI_CRUISE_CONTROL_AUTH_ENABLED"; + /* test */ static final String ENV_VAR_TOPIC_OPERATOR_UNALTERABLE_TOPIC_CONFIG = "STRIMZI_UNALTERABLE_TOPIC_CONFIG"; + + /* test */ static final String THROTTLED_REPLICAS_TOPIC_CONFIGS = "follower.replication.throttled.replicas,leader.replication.throttled.replicas"; // Kafka bootstrap servers can't be specified in the JSON /* test */ final String kafkaBootstrapServers; @@ -199,6 +202,7 @@ protected List getEnvVars() { varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_PORT, String.valueOf(CRUISE_CONTROL_API_PORT))); varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_SSL_ENABLED, "true")); varList.add(ContainerUtils.createEnvVar(ENV_VAR_CRUISE_CONTROL_AUTH_ENABLED, "true")); + varList.add(ContainerUtils.createEnvVar(ENV_VAR_TOPIC_OPERATOR_UNALTERABLE_TOPIC_CONFIG, THROTTLED_REPLICAS_TOPIC_CONFIGS)); // Truststore and API credentials are mounted in the container } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/EntityTopicOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/EntityTopicOperatorTest.java index bdf4bcaf9eb..8c82eb400f3 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/EntityTopicOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/EntityTopicOperatorTest.java @@ -35,6 +35,7 @@ import java.util.Map; import static io.strimzi.operator.cluster.model.EntityTopicOperator.CRUISE_CONTROL_API_PORT; +import static io.strimzi.operator.cluster.model.EntityTopicOperator.THROTTLED_REPLICAS_TOPIC_CONFIGS; import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties.API_TO_ADMIN_NAME; import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties.API_TO_ADMIN_NAME_KEY; import static io.strimzi.operator.common.model.cruisecontrol.CruiseControlApiProperties.API_TO_ADMIN_PASSWORD_KEY; @@ -337,6 +338,7 @@ public void testSetupWithCruiseControlEnabled() { expectedEnvVars.add(new EnvVarBuilder().withName(EntityTopicOperator.ENV_VAR_CRUISE_CONTROL_PORT).withValue(String.valueOf(CRUISE_CONTROL_API_PORT)).build()); expectedEnvVars.add(new EnvVarBuilder().withName(EntityTopicOperator.ENV_VAR_CRUISE_CONTROL_SSL_ENABLED).withValue(Boolean.toString(true)).build()); expectedEnvVars.add(new EnvVarBuilder().withName(EntityTopicOperator.ENV_VAR_CRUISE_CONTROL_AUTH_ENABLED).withValue(Boolean.toString(true)).build()); + expectedEnvVars.add(new EnvVarBuilder().withName(EntityTopicOperator.ENV_VAR_TOPIC_OPERATOR_UNALTERABLE_TOPIC_CONFIG).withValue(THROTTLED_REPLICAS_TOPIC_CONFIGS).build()); assertThat(entityTopicOperator.getEnvVars(), is(expectedEnvVars)); Container container = entityTopicOperator.createContainer(null); diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java index b06a7fbcc8b..75213dffa75 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java @@ -63,6 +63,8 @@ import static io.strimzi.api.kafka.model.topic.ReplicasChangeState.ONGOING; import static io.strimzi.api.kafka.model.topic.ReplicasChangeState.PENDING; +import static io.strimzi.operator.topic.TopicOperatorConfig.ALTERABLE_TOPIC_CONFIG; +import static io.strimzi.operator.topic.TopicOperatorConfig.UNALTERABLE_TOPIC_CONFIG; import static io.strimzi.operator.topic.TopicOperatorUtil.startReconciliationTimer; import static io.strimzi.operator.topic.TopicOperatorUtil.stopReconciliationTimer; import static io.strimzi.operator.topic.TopicOperatorUtil.topicNames; @@ -1169,6 +1171,15 @@ private void skipNonAlterableConfigs(Set alterConfigOps) { alterConfigOps.removeIf(op -> !alterablePropertySet.contains(op.configEntry().name())); } } + + var unalterableConfigs = config.unalterableTopicConfig(); + if (unalterableConfigs != null && alterConfigOps != null && !unalterableConfigs.isEmpty()) { + if (!unalterableConfigs.equalsIgnoreCase("NONE")) { + var unalterablePropertySet = Arrays.stream(unalterableConfigs.replaceAll("\\s", "").split(",")) + .collect(Collectors.toSet()); + alterConfigOps.removeIf(op -> unalterablePropertySet.contains(op.configEntry().name())); + } + } } private static boolean hasConfig(KafkaTopic kt) { @@ -1198,22 +1209,43 @@ private void addNonAlterableConfigsWarning(ReconcilableTopic reconcilableTopic, List conditions) { var readOnlyConfigs = new ArrayList<>(); var alterableConfigs = config.alterableTopicConfig(); + var unalterableConfigs = config.unalterableTopicConfig(); + + if (reconcilableTopic != null && reconcilableTopic.kt() != null && hasConfig(reconcilableTopic.kt())) { + if (alterableConfigs != null) { + if (alterableConfigs.equalsIgnoreCase("NONE")) { + reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> readOnlyConfigs.add(key)); + } else if (!alterableConfigs.equalsIgnoreCase("ALL") && !alterableConfigs.isBlank()) { + var alterablePropertySet = Arrays.stream(alterableConfigs.replaceAll("\\s", "").split(",")) + .collect(Collectors.toSet()); + reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> { + if (!alterablePropertySet.contains(key)) { + readOnlyConfigs.add(key); + } + }); + } + } - if (reconcilableTopic != null && reconcilableTopic.kt() != null - && hasConfig(reconcilableTopic.kt()) && alterableConfigs != null) { - if (alterableConfigs.equalsIgnoreCase("NONE")) { - reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> readOnlyConfigs.add(key)); - } else if (!alterableConfigs.equalsIgnoreCase("ALL") && !alterableConfigs.isBlank()) { - var alterablePropertySet = Arrays.stream(alterableConfigs.replaceAll("\\s", "").split(",")) - .collect(Collectors.toSet()); - reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> { - if (!alterablePropertySet.contains(key)) { - readOnlyConfigs.add(key); - } - }); + if (unalterableConfigs != null) { + if (!unalterableConfigs.equalsIgnoreCase("NONE")) { + var unalterablePropertySet = Arrays.stream(unalterableConfigs.replaceAll("\\s", "").split(",")) + .collect(Collectors.toSet()); + reconcilableTopic.kt().getSpec().getConfig().forEach((key, value) -> { + if (unalterablePropertySet.contains(key)) { + readOnlyConfigs.add(key); + } + }); + } } } + if (!ALTERABLE_TOPIC_CONFIG.defaultValue().equals(alterableConfigs) && !UNALTERABLE_TOPIC_CONFIG.defaultValue().equals(unalterableConfigs)) { + LOGGER.warnOp("{} and {} have non-default values. {}}: {}; {}} {}", + ALTERABLE_TOPIC_CONFIG.key(), UNALTERABLE_TOPIC_CONFIG.key(), + ALTERABLE_TOPIC_CONFIG.key(), alterableConfigs, + UNALTERABLE_TOPIC_CONFIG.key(), unalterableConfigs); + } + if (!readOnlyConfigs.isEmpty()) { var properties = String.join(", ", readOnlyConfigs.toArray(new String[0])); var message = "These .spec.config properties are not configurable: [" + properties + "]"; diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java index 735b4260233..cac65e45fed 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/TopicOperatorConfig.java @@ -35,39 +35,40 @@ /** * Config * - * @param namespace The namespace that the operator will watch for KafkaTopics - * @param labelSelector The label selector that KafkaTopics must match - * @param bootstrapServers The Kafka bootstrap servers - * @param clientId The client Id to use for the Admin client - * @param fullReconciliationIntervalMs The resync interval, in ms - * @param tlsEnabled Whether the Admin client should be configured to use TLS - * @param truststoreLocation The location (path) of the Admin client's truststore. - * @param truststorePassword The password for the truststore at {@code truststoreLocation}. - * @param keystoreLocation The location (path) of the Admin client's keystore. - * @param keystorePassword The password for the keystore at {@code keystoreLocation}. + * @param saslUsername, The SASL username for the Admin client + * @param saslPassword, The SASL password for the Admin client + * @param namespace The namespace that the operator will watch for KafkaTopics + * @param labelSelector The label selector that KafkaTopics must match + * @param bootstrapServers The Kafka bootstrap servers + * @param clientId The client Id to use for the Admin client + * @param fullReconciliationIntervalMs The resync interval, in ms + * @param tlsEnabled Whether the Admin client should be configured to use TLS + * @param truststoreLocation The location (path) of the Admin client's truststore. + * @param truststorePassword The password for the truststore at {@code truststoreLocation}. + * @param keystoreLocation The location (path) of the Admin client's keystore. + * @param keystorePassword The password for the keystore at {@code keystoreLocation}. * @param sslEndpointIdentificationAlgorithm The SSL endpoint identification algorithm - * @param saslEnabled Whether the Admin client should be configured to use SASL - * @param saslMechanism The SASL mechanism for the Admin client - * @param saslCustomConfigJson The SASL custom values for the Admin client when using alternate auth mechanisms. - * @param saslUsername, The SASL username for the Admin client - * @param saslPassword, The SASL password for the Admin client - * @param securityProtocol The security protocol for the Admin client - * @param useFinalizer Whether to use finalizers - * @param maxQueueSize The capacity of the queue - * @param maxBatchSize The maximum size of a reconciliation batch - * @param maxBatchLingerMs The maximum time to wait for a reconciliation batch to contain {@code maxBatchSize} items. - * @param enableAdditionalMetrics Whether to enable additional metrics - * @param cruiseControlEnabled Whether Cruise Control integration is enabled - * @param cruiseControlRackEnabled Whether the target Kafka cluster has rack awareness - * @param cruiseControlHostname Cruise Control hostname - * @param cruiseControlPort Cruise Control port - * @param cruiseControlSslEnabled Whether Cruise Control SSL encryption is enabled - * @param cruiseControlAuthEnabled Whether Cruise Control Basic authentication is enabled - * @param cruiseControlCrtFilePath Certificate chain to be trusted - * @param cruiseControlApiUserPath Api admin username file path - * @param cruiseControlApiPassPath Api admin password file path - * @param alterableTopicConfig Comma separated list of the alterable Kafka topic properties - * @param skipClusterConfigReview For some managed Kafka services the Cluster config is not callable, so this skips those calls. + * @param saslEnabled Whether the Admin client should be configured to use SASL + * @param saslMechanism The SASL mechanism for the Admin client + * @param saslCustomConfigJson The SASL custom values for the Admin client when using alternate auth mechanisms. + * @param securityProtocol The security protocol for the Admin client + * @param useFinalizer Whether to use finalizers + * @param maxQueueSize The capacity of the queue + * @param maxBatchSize The maximum size of a reconciliation batch + * @param maxBatchLingerMs The maximum time to wait for a reconciliation batch to contain {@code maxBatchSize} items. + * @param enableAdditionalMetrics Whether to enable additional metrics + * @param cruiseControlEnabled Whether Cruise Control integration is enabled + * @param cruiseControlRackEnabled Whether the target Kafka cluster has rack awareness + * @param cruiseControlHostname Cruise Control hostname + * @param cruiseControlPort Cruise Control port + * @param cruiseControlSslEnabled Whether Cruise Control SSL encryption is enabled + * @param cruiseControlAuthEnabled Whether Cruise Control Basic authentication is enabled + * @param cruiseControlCrtFilePath Certificate chain to be trusted + * @param cruiseControlApiUserPath Api admin username file path + * @param cruiseControlApiPassPath Api admin password file path + * @param alterableTopicConfig Comma separated list of Kafka topic configurations that are reconciled + * @param unalterableTopicConfig Comma separated list of Kafka topic configurations that are ignored + * @param skipClusterConfigReview For some managed Kafka services the Cluster config is not callable, so this skips those calls. */ public record TopicOperatorConfig( String namespace, @@ -103,6 +104,7 @@ public record TopicOperatorConfig( String cruiseControlApiUserPath, String cruiseControlApiPassPath, String alterableTopicConfig, + String unalterableTopicConfig, boolean skipClusterConfigReview ) { private final static ReconciliationLogger LOGGER = ReconciliationLogger.create(TopicOperatorConfig.class); @@ -134,6 +136,7 @@ public record TopicOperatorConfig( static final ConfigParameter MAX_BATCH_LINGER_MS = new ConfigParameter<>("STRIMZI_MAX_BATCH_LINGER_MS", strictlyPositive(LONG), "100", CONFIG_VALUES); static final ConfigParameter ENABLE_ADDITIONAL_METRICS = new ConfigParameter<>("STRIMZI_ENABLE_ADDITIONAL_METRICS", BOOLEAN, "false", CONFIG_VALUES); static final ConfigParameter ALTERABLE_TOPIC_CONFIG = new ConfigParameter<>("STRIMZI_ALTERABLE_TOPIC_CONFIG", STRING, "ALL", CONFIG_VALUES); + static final ConfigParameter UNALTERABLE_TOPIC_CONFIG = new ConfigParameter<>("STRIMZI_UNALTERABLE_TOPIC_CONFIG", STRING, "NONE", CONFIG_VALUES); static final ConfigParameter SKIP_CLUSTER_CONFIG_REVIEW = new ConfigParameter<>("STRIMZI_SKIP_CLUSTER_CONFIG_REVIEW", BOOLEAN, "false", CONFIG_VALUES); static final ConfigParameter FEATURE_GATES = new ConfigParameter<>("STRIMZI_FEATURE_GATES", parseFeatureGates(), "", CONFIG_VALUES); @@ -212,6 +215,7 @@ public TopicOperatorConfig(Map map) { get(map, CRUISE_CONTROL_API_USER_PATH), get(map, CRUISE_CONTROL_API_PASS_PATH), get(map, ALTERABLE_TOPIC_CONFIG), + get(map, UNALTERABLE_TOPIC_CONFIG), get(map, SKIP_CLUSTER_CONFIG_REVIEW) ); } @@ -356,6 +360,7 @@ public String toString() { "\n\tsaslMechanism='" + saslMechanism + '\'' + "\n\tsaslCustomConfigJson='" + (saslCustomConfigJson == null ? null : mask) + '\'' + "\n\talterableTopicConfig='" + alterableTopicConfig + '\'' + + "\n\tunalterableTopicConfig='" + unalterableTopicConfig + '\'' + "\n\tskipClusterConfigReview='" + skipClusterConfigReview + '\'' + "\n\tsaslUsername='" + saslUsername + '\'' + "\n\tsaslPassword='" + mask + '\'' + diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerTest.java b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerTest.java index e68aa491fcd..34df643d122 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerTest.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerTest.java @@ -47,7 +47,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; @@ -500,9 +500,8 @@ public void shouldNotCallGetClusterConfigWhenDisabled() { } @ParameterizedTest - @NullSource - @ValueSource(strings = { "ALL", "" }) - public void shouldUpdateProperties(String alterableTopicConfig, KafkaCluster cluster) throws InterruptedException, ExecutionException { + @CsvSource(value = {",", "'',''", "ALL,", "ALL,''", "ALL,NONE", ",NONE", "'',NONE"}) + public void shouldUpdateProperties(String alterableTopicConfig, String unalterableTopicConfig, KafkaCluster cluster) throws InterruptedException, ExecutionException { admin[0] = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers())); admin[0].createTopics(List.of(new NewTopic(MY_TOPIC, 1, (short) 1).configs(Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy")))).all().get(); @@ -530,6 +529,7 @@ public void shouldUpdateProperties(String alterableTopicConfig, KafkaCluster clu Mockito.doReturn(NAMESPACE).when(config).namespace(); Mockito.doReturn(true).when(config).skipClusterConfigReview(); Mockito.doReturn(alterableTopicConfig).when(config).alterableTopicConfig(); + Mockito.doReturn(unalterableTopicConfig).when(config).unalterableTopicConfig(); var replicasChangeHandler = Mockito.mock(ReplicasChangeHandler.class); controller = new BatchingTopicController(config, Map.of("key", "VALUE"), adminSpy, client, metrics, replicasChangeHandler); @@ -652,4 +652,58 @@ public void shouldNotUpdatePropertiesNotInTheAlterableProperties(KafkaCluster cl assertEquals("NotConfigurable", notConfiguredCondition.getReason()); assertEquals("True", notConfiguredCondition.getStatus()); } + + @Test + public void shouldNotUpdatePropertiesInTheUnalterableProperties(KafkaCluster cluster) throws InterruptedException, ExecutionException { + admin[0] = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers())); + + admin[0].createTopics(List.of(new NewTopic(MY_TOPIC, 1, (short) 1).configs(Map.of( + TopicConfig.CLEANUP_POLICY_CONFIG, "delete", + "follower.replication.throttled.replicas", "*", + "leader.replication.throttled.replicas", "*" + )))).all().get(); + + Admin adminSpy = Mockito.spy(admin[0]); + + // Setup the KafkaTopic with 1 property change that is not in the alterableTopicConfig list. + var kt = Crds.topicOperation(client).resource( + new KafkaTopicBuilder() + .withNewMetadata() + .withName(MY_TOPIC) + .withNamespace(namespace(NAMESPACE)) + .addToLabels("key", "VALUE") + .endMetadata() + .withNewSpec() + .withConfig(Map.of(TopicConfig.CLEANUP_POLICY_CONFIG, "compact")) + .withPartitions(2) + .withReplicas(1) + .endSpec() + .build()).create(); + + var config = Mockito.mock(TopicOperatorConfig.class); + Mockito.doReturn(NAMESPACE).when(config).namespace(); + Mockito.doReturn(true).when(config).skipClusterConfigReview(); + Mockito.doReturn("cleanup.policy, follower.replication.throttled.replicas, leader.replication.throttled.replicas").when(config).unalterableTopicConfig(); + var replicasChangeHandler = Mockito.mock(ReplicasChangeHandler.class); + + controller = new BatchingTopicController(config, Map.of("key", "VALUE"), adminSpy, client, metrics, replicasChangeHandler); + List batch = List.of(new ReconcilableTopic(new Reconciliation("test", "KafkaTopic", NAMESPACE, MY_TOPIC), kt, topicName(kt))); + + controller.onUpdate(batch); + + Mockito.verify(adminSpy, Mockito.never()).incrementalAlterConfigs(any()); + + var updateTopic = Crds.topicOperation(client).inNamespace(NAMESPACE).withName("my-topic").get(); + + var conditionList = updateTopic.getStatus().getConditions(); + assertEquals(2, conditionList.size()); + + var readyCondition = conditionList.stream().filter(condition -> condition.getType().equals("Ready")).findFirst().get(); + assertEquals("True", readyCondition.getStatus()); + + var notConfiguredCondition = conditionList.stream().filter(condition -> condition.getType().equals("Warning")).findFirst().get(); + assertEquals("These .spec.config properties are not configurable: [cleanup.policy]", notConfiguredCondition.getMessage()); + assertEquals("NotConfigurable", notConfiguredCondition.getReason()); + assertEquals("True", notConfiguredCondition.getStatus()); + } } diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java index 1a0e8249a75..36ffac05ff0 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java @@ -3,7 +3,7 @@ * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). */ package io.strimzi.operator.topic; - + import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.Resource; import io.kroxylicious.testing.kafka.api.KafkaCluster; @@ -1236,7 +1236,7 @@ private static TopicOperatorConfig topicOperatorConfig(String ns, KafkaCluster k useFinalizer, 100, 100, 10, false, new FeatureGates(""), false, false, "", 9090, false, false, "", "", "", - "all", false); + "all", "none", false); } @ParameterizedTest @@ -1990,7 +1990,7 @@ public void shouldTerminateIfQueueFull( true, 1, 100, 5_0000, false, new FeatureGates(""), false, false, "", 9090, false, false, "", "", "", - "all", false); + "all", "none", false); maybeStartOperator(config);