Skip to content

Commit

Permalink
feat: allow customizing TLS ServerName (#638)
Browse files Browse the repository at this point in the history
In some cases, it is meaningful to set the
tls.Config.ServerName field (for example when brokers 
are replying to clients advertising their IP as Kafka hostname).

Add configuration via environment variable to allow customizing
the ServerName field.

Signed-off-by: inge4pres <[email protected]>
  • Loading branch information
inge4pres authored Feb 25, 2025
1 parent 07cf390 commit 065e890
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
6 changes: 6 additions & 0 deletions kafka/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ func (cfg *CommonConfig) finalize() error {
cfg.TLS = &tls.Config{}
tlsInsecure := os.Getenv("KAFKA_TLS_INSECURE") == "true"
caCertPath := os.Getenv("KAFKA_TLS_CA_CERT_PATH")
if overriddenServerName, exists := os.LookupEnv("KAFKA_TLS_SERVER_NAME"); exists {
cfg.Logger.Debug("overriding TLS server name", zap.String("server_name", overriddenServerName))
cfg.TLS.ServerName = overriddenServerName
}
if tlsInsecure && (caCertPath != "" || certPath != "" || keyPath != "") {
errs = append(errs, errors.New(
"kafka: cannot set KAFKA_TLS_INSECURE when either of KAFKA_TLS_CA_CERT_PATH, KAFKA_TLS_CERT_PATH, or KAFKA_TLS_KEY_PATH are set",
Expand All @@ -240,6 +244,8 @@ func (cfg *CommonConfig) finalize() error {
cfg.TLS = nil
}
}
// Only configure SASL if it is not already set and when there is no
// intention to configure mTLS.
if cfg.SASL == nil && certPath == "" && keyPath == "" {
saslConfig := saslConfigProperties{
Mechanism: os.Getenv("KAFKA_SASL_MECHANISM"),
Expand Down
12 changes: 12 additions & 0 deletions kafka/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,18 @@ aws_session_token=IQoJb3JpZ2luX2IQoJb3JpZ2luX2IQoJb3JpZ2luX2IQoJb3JpZ2luX2IQoJb3
Logger: zap.NewNop(),
})
})

t.Run("tls_override_server_name", func(t *testing.T) {
t.Setenv("KAFKA_TLS_SERVER_NAME", "overriden.server.name")
assertValid(t, CommonConfig{
Brokers: []string{"broker"},
Logger: zap.NewNop().Named("kafka"),
TLS: &tls.Config{ServerName: "overriden.server.name"},
}, CommonConfig{
Brokers: []string{"broker"},
Logger: zap.NewNop(),
})
})
})

t.Run("configfile_from_env", func(t *testing.T) {
Expand Down

0 comments on commit 065e890

Please sign in to comment.