Skip to content

Commit

Permalink
feat: Passing callback func to get message client in WatchForChanges
Browse files Browse the repository at this point in the history
closes #468

Signed-off-by: Jack Chen <[email protected]>
  • Loading branch information
jackchenjc committed Jan 22, 2025
1 parent a0a2198 commit b6bb7df
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 66 deletions.
93 changes: 30 additions & 63 deletions bootstrap/config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*******************************************************************************
* Copyright 2019 Dell Inc.
* Copyright 2023 Intel Corporation
* Copyright 2024 IOTech Ltd
* Copyright 2024-2025 IOTech Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -65,8 +65,6 @@ const (
deviceServicesKey = "device-services"

SecurityModeKey = "Mode"

configProviderTypeKeeper = "keeper"
)

var invalidRemoteHostsError = errors.New("-rsh/--remoteServiceHosts must contain 3 and only 3 comma seperated host names")
Expand Down Expand Up @@ -270,16 +268,16 @@ func (cp *Processor) Process(

// listen for changes on Writable
if useProvider {
cp.listenForPrivateChanges(serviceConfig, privateConfigClient, utils.BuildBaseKey(configStem, serviceKey), configProviderInfo.ServiceConfig().Type)
cp.listenForPrivateChanges(serviceConfig, privateConfigClient, utils.BuildBaseKey(configStem, serviceKey))
cp.lc.Infof("listening for private config changes")
cp.listenForCommonChanges(serviceConfig, cp.commonConfigClient, privateConfigClient, utils.BuildBaseKey(configStem, common.CoreCommonConfigServiceKey, allServicesKey), configProviderInfo.ServiceConfig().Type)
cp.listenForCommonChanges(serviceConfig, cp.commonConfigClient, privateConfigClient, utils.BuildBaseKey(configStem, common.CoreCommonConfigServiceKey, allServicesKey))
cp.lc.Infof("listening for all services common config changes")
if cp.appConfigClient != nil {
cp.listenForCommonChanges(serviceConfig, cp.appConfigClient, privateConfigClient, utils.BuildBaseKey(configStem, common.CoreCommonConfigServiceKey, appServicesKey), configProviderInfo.ServiceConfig().Type)
cp.listenForCommonChanges(serviceConfig, cp.appConfigClient, privateConfigClient, utils.BuildBaseKey(configStem, common.CoreCommonConfigServiceKey, appServicesKey))
cp.lc.Infof("listening for application service common config changes")
}
if cp.deviceConfigClient != nil {
cp.listenForCommonChanges(serviceConfig, cp.deviceConfigClient, privateConfigClient, utils.BuildBaseKey(configStem, common.CoreCommonConfigServiceKey, deviceServicesKey), configProviderInfo.ServiceConfig().Type)
cp.listenForCommonChanges(serviceConfig, cp.deviceConfigClient, privateConfigClient, utils.BuildBaseKey(configStem, common.CoreCommonConfigServiceKey, deviceServicesKey))
cp.lc.Infof("listening for device service common config changes")
}
}
Expand Down Expand Up @@ -652,28 +650,7 @@ func (cp *Processor) ListenForCustomConfigChanges(
updateStream := make(chan any)
defer close(updateStream)

var messageBus messaging.MessageClient
configProviderUrl := cp.flags.ConfigProviderUrl()
// check if the config provider type is keeper
if strings.HasPrefix(configProviderUrl, configProviderTypeKeeper) {
// there's no startupTimer for cp created by NewProcessorForCustomConfig
// add a new startupTimer here
if !cp.startupTimer.HasNotElapsed() {
cp.startupTimer = startup.NewStartUpTimer("")
}
for cp.startupTimer.HasNotElapsed() {
if msgClient := container.MessagingClientFrom(cp.dic.Get); msgClient != nil {
messageBus = msgClient
break
}
cp.startupTimer.SleepForInterval()
}
if messageBus == nil {
cp.lc.Error("unable to use MessageClient to watch for custom configuration changes")
return
}
}
go configClient.WatchForChanges(updateStream, errorStream, configToWatch, sectionName, messageBus)
go configClient.WatchForChanges(updateStream, errorStream, configToWatch, sectionName, cp.getMessageClient)

isFirstUpdate := true

Expand Down Expand Up @@ -772,7 +749,7 @@ func GetConfigFileLocation(lc logger.LoggingClient, flags flags.Common) string {
// service's configuration writable sub-struct. It's assumed the log level is universally part of the
// writable struct and this function explicitly updates the loggingClient's log level when new configuration changes
// are received.
func (cp *Processor) listenForPrivateChanges(serviceConfig interfaces.Configuration, configClient configuration.Client, baseKey string, configProviderType string) {
func (cp *Processor) listenForPrivateChanges(serviceConfig interfaces.Configuration, configClient configuration.Client, baseKey string) {
lc := cp.lc
isFirstUpdate := true

Expand All @@ -786,22 +763,7 @@ func (cp *Processor) listenForPrivateChanges(serviceConfig interfaces.Configurat
updateStream := make(chan any)
defer close(updateStream)

// get the MessageClient to be used in Keeper WatchForChanges method
var messageBus messaging.MessageClient
if configProviderType == configProviderTypeKeeper {
for cp.startupTimer.HasNotElapsed() {
if msgClient := container.MessagingClientFrom(cp.dic.Get); msgClient != nil {
messageBus = msgClient
break
}
cp.startupTimer.SleepForInterval()
}
if messageBus == nil {
lc.Error("unable to use MessageClient to watch for configuration changes")
return
}
}
go configClient.WatchForChanges(updateStream, errorStream, serviceConfig.EmptyWritablePtr(), writableKey, messageBus)
go configClient.WatchForChanges(updateStream, errorStream, serviceConfig.EmptyWritablePtr(), writableKey, cp.getMessageClient)

for {
select {
Expand Down Expand Up @@ -844,7 +806,7 @@ func (cp *Processor) listenForPrivateChanges(serviceConfig interfaces.Configurat
// listenForCommonChanges leverages the Configuration Provider client's WatchForChanges() method to receive changes to and update the
// service's common configuration writable sub-struct.
func (cp *Processor) listenForCommonChanges(fullServiceConfig interfaces.Configuration, configClient configuration.Client,
privateConfigClient configuration.Client, baseKey string, configProviderType string) {
privateConfigClient configuration.Client, baseKey string) {
lc := cp.lc
isFirstUpdate := true
baseKey = utils.BuildBaseKey(baseKey, writableKey)
Expand All @@ -863,22 +825,7 @@ func (cp *Processor) listenForCommonChanges(fullServiceConfig interfaces.Configu
updateStream := make(chan any)
defer close(updateStream)

// get the MessageClient to be used in Keeper WatchForChanges method
var messageBus messaging.MessageClient
if configProviderType == configProviderTypeKeeper {
for cp.startupTimer.HasNotElapsed() {
if msgClient := container.MessagingClientFrom(cp.dic.Get); msgClient != nil {
messageBus = msgClient
break
}
cp.startupTimer.SleepForInterval()
}
if messageBus == nil {
lc.Error("unable to use MessageClient to watch for configuration changes")
return
}
}
go commonConfigClient.WatchForChanges(updateStream, errorStream, fullServiceConfig.EmptyWritablePtr(), writableKey, messageBus)
go commonConfigClient.WatchForChanges(updateStream, errorStream, fullServiceConfig.EmptyWritablePtr(), writableKey, cp.getMessageClient)

for {
select {
Expand Down Expand Up @@ -1129,6 +1076,26 @@ func (cp *Processor) loadConfigFromProvider(serviceConfig interfaces.Configurati
return nil
}

// getMessageClient waits and gets the message client
func (cp *Processor) getMessageClient() (msgClient messaging.MessageClient) {
// there's no startupTimer for cp created by NewProcessorForCustomConfig
// add a new startupTimer here
if !cp.startupTimer.HasNotElapsed() {
cp.startupTimer = startup.NewStartUpTimer("")
}
for cp.startupTimer.HasNotElapsed() {
if msgClient = container.MessagingClientFrom(cp.dic.Get); msgClient != nil {
break
}
cp.startupTimer.SleepForInterval()
}
if msgClient == nil {
cp.lc.Error("unable to use MessageClient to watch for configuration changes")
return nil
}
return msgClient
}

// getSecretNamesChanged returns a slice of secretNames that have changed secrets or are new.
func getSecretNamesChanged(prevVals config.InsecureSecrets, curVals config.InsecureSecrets) []string {
var updatedNames []string
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23

require (
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.10
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.11
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.22
github.com/edgexfoundry/go-mod-messaging/v4 v4.0.0-dev.10
github.com/edgexfoundry/go-mod-registry/v4 v4.0.0-dev.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o=
github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk=
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.10 h1:DMv5LZDxcqUeb1dREMd/vK+reXmZYlpafgtm8XhYdHQ=
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.10/go.mod h1:ltUpMcOpJSzmabBtZox5qg1AK2wEikvZJyIBXtJ7mUQ=
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.11 h1:VCDeyEwhSb01dhJ3Rw0DbtV8pc18TILAoUGSFn7NR64=
github.com/edgexfoundry/go-mod-configuration/v4 v4.0.0-dev.11/go.mod h1:ltUpMcOpJSzmabBtZox5qg1AK2wEikvZJyIBXtJ7mUQ=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.22 h1:XzDtbAmvp/v+DZlFoSgttnUQsIXGhlf+H8xgGxBClUA=
github.com/edgexfoundry/go-mod-core-contracts/v4 v4.0.0-dev.22/go.mod h1:D35HIMZkFFy82shKtPYaEL3Nn+ZNEjUjZI1RLn1j23E=
github.com/edgexfoundry/go-mod-messaging/v4 v4.0.0-dev.10 h1:xvDQDIJtmj/ZCmKzbAzg3h1F2ZdWz1MPoJSNfYZANGc=
Expand Down

0 comments on commit b6bb7df

Please sign in to comment.