From 7c3f00fdd8f18ae975262544bd7947d28bf38455 Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Mon, 11 Dec 2023 09:43:52 -0800 Subject: [PATCH 1/3] Wrap isSecure config in config map for kafka topic --- common/config/kafkaConfig.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/common/config/kafkaConfig.go b/common/config/kafkaConfig.go index 87069e13b62..b2515c91dd1 100644 --- a/common/config/kafkaConfig.go +++ b/common/config/kafkaConfig.go @@ -44,9 +44,8 @@ type ( // TopicConfig describes the mapping from topic to Kafka cluster TopicConfig struct { Cluster string `yaml:"cluster"` - // IsSecure describes whether the topic is secure, by default it is false - // If it is set to true, it allows to pass in the token to initialize secure producer for this topic - IsSecure bool `yaml:"isSecure,omitempty"` + // Properties map describes whether the topic properties, such as whether it is secure + Properties map[string]interface{} `yaml:"Properties,omitempty"` } // TopicList describes the topic names for each cluster @@ -103,7 +102,12 @@ func (k *KafkaConfig) GetTopicsForApplication(app string) TopicList { return k.Applications[app] } -// GetKafkaClusterSecureConfigForTopic gets isSecure status from topic -func (k *KafkaConfig) GetKafkaSecureConfigForTopic(topic string) bool { - return k.Topics[topic].IsSecure +// GetKafkaPropertiesForTopic gets properties from topic +func (k *KafkaConfig) GetKafkaPropertiesForTopic(topic string) map[string]interface{} { + topicConfig, ok := k.Topics[topic] + if !ok || topicConfig.Properties == nil { + // No properties for the specified topic in the config + return nil + } + return topicConfig.Properties } From 51fcca6930525110011e809c57379b18a3040f24 Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Mon, 11 Dec 2023 09:53:49 -0800 Subject: [PATCH 2/3] Use lower case in yaml --- common/config/kafkaConfig.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/config/kafkaConfig.go b/common/config/kafkaConfig.go index b2515c91dd1..2282bb99be5 100644 --- a/common/config/kafkaConfig.go +++ b/common/config/kafkaConfig.go @@ -45,7 +45,7 @@ type ( TopicConfig struct { Cluster string `yaml:"cluster"` // Properties map describes whether the topic properties, such as whether it is secure - Properties map[string]interface{} `yaml:"Properties,omitempty"` + Properties map[string]interface{} `yaml:"properties,omitempty"` } // TopicList describes the topic names for each cluster From 9e4f83c4bbee32eadb49d86bfbb4da0331192953 Mon Sep 17 00:00:00 2001 From: Neil Xie Date: Mon, 11 Dec 2023 10:49:09 -0800 Subject: [PATCH 3/3] Retrieve value of property instead of return the whole map of properties --- common/config/kafkaConfig.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/common/config/kafkaConfig.go b/common/config/kafkaConfig.go index 2282bb99be5..86b1b4ebe49 100644 --- a/common/config/kafkaConfig.go +++ b/common/config/kafkaConfig.go @@ -45,7 +45,7 @@ type ( TopicConfig struct { Cluster string `yaml:"cluster"` // Properties map describes whether the topic properties, such as whether it is secure - Properties map[string]interface{} `yaml:"properties,omitempty"` + Properties map[string]any `yaml:"properties,omitempty"` } // TopicList describes the topic names for each cluster @@ -103,11 +103,19 @@ func (k *KafkaConfig) GetTopicsForApplication(app string) TopicList { } // GetKafkaPropertiesForTopic gets properties from topic -func (k *KafkaConfig) GetKafkaPropertiesForTopic(topic string) map[string]interface{} { +func (k *KafkaConfig) GetKafkaPropertyForTopic(topic string, property string) any { topicConfig, ok := k.Topics[topic] if !ok || topicConfig.Properties == nil { // No properties for the specified topic in the config return nil } - return topicConfig.Properties + + // retrieve the property from the topic properties + propertyValue, ok := topicConfig.Properties[property] + if !ok { + // Property not found + return nil + } + + return propertyValue }