From 311920a8921bfb68a8b95fa23bfc60806f08b151 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 10:33:42 +0530 Subject: [PATCH 01/15] startSystemWorkflowInformer in ci and job --- pkg/controller/controller.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ac611099..8b6b0409 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -117,6 +117,13 @@ type ExternalCdConfig struct { Namespace string `env:"CD_EXTERNAL_NAMESPACE" envDefault:""` } +type ExternalCiConfig struct { + External bool `env:"CI_EXTERNAL_REST_LISTENER" envDefault:"false"` + Token string `env:"CI_EXTERNAL_ORCHESTRATOR_TOKEN" envDefault:""` + ListenerUrl string `env:"CI_EXTERNAL_LISTENER_URL" envDefault:"http://devtroncd-orchestrator-service-prod.devtroncd:80"` + Namespace string `env:"CI_EXTERNAL_NAMESPACE" envDefault:""` +} + type AcdConfig struct { ACDNamespace string `env:"ACD_NAMESPACE" envDefault:"devtroncd"` ACDInformer bool `env:"ACD_INFORMER" envDefault:"true"` @@ -168,6 +175,11 @@ func Start(conf *config.Config, eventHandler handlers.Handler) { } else { namespace = ciCfg.DefaultNamespace } + clusterCfg := &ClusterConfig{} + err = env.Parse(clusterCfg) + if clusterCfg.ClusterType == ClusterTypeAll && !externalCD.External { + startSystemWorkflowInformer(logger) + } stopCh := make(chan struct{}) defer close(stopCh) startWorkflowInformer(namespace, logger, pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, stopCh, dynamicClient, externalCD) From 826963379ff67d12d26de229a779b550d5c56623 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 13:02:12 +0530 Subject: [PATCH 02/15] Start informer for ci --- pkg/informer/K8sInformer.go | 73 +++++++++++++++++++++++++++++++++---- 1 file changed, 65 insertions(+), 8 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index f79a2165..012cd6cc 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -44,7 +44,7 @@ type K8sInformer interface { startSystemWorkflowInformerForCluster(clusterInfo ClusterInfo) error syncSystemWorkflowInformer(clusterId int) error stopSystemWorkflowInformer(clusterId int) - startSystemWorkflowInformer(clusterId int) error + startSystemWorkflowInformerForCd(clusterId int) error BuildInformerForAllClusters() error } @@ -78,7 +78,8 @@ func (impl *K8sInformerImpl) BuildInformerForAllClusters() error { return err } for _, model := range models { - impl.startSystemWorkflowInformer(model.Id) + impl.startSystemWorkflowInformerForCd(model.Id) + impl.startSystemWorkflowInformerForCi(model.Id) } return nil } @@ -100,7 +101,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCluster(clusterInfo C restConfig.Insecure = true } - err := impl.startSystemWorkflowInformer(clusterInfo.ClusterId) + err := impl.startSystemWorkflowInformerForCd(clusterInfo.ClusterId) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { impl.logger.Error("error in creating informer for new cluster", "err", err) return err @@ -209,7 +210,7 @@ func (impl *K8sInformerImpl) handleClusterChangeEvent(secretObject *coreV1.Secre var err error if string(action) == ADD { - err = impl.startSystemWorkflowInformer(clusterId) + err = impl.startSystemWorkflowInformerForCd(clusterId) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) return @@ -236,7 +237,7 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { impl.stopSystemWorkflowInformer(clusterInfo.Id) impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) //create new informer for cluster with new config - err = impl.startSystemWorkflowInformer(clusterId) + err = impl.startSystemWorkflowInformerForCd(clusterId) if err != nil { impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) return err @@ -253,11 +254,11 @@ func (impl *K8sInformerImpl) stopSystemWorkflowInformer(clusterId int) { return } -func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error { +func (impl *K8sInformerImpl) startSystemWorkflowInformerForCd(clusterId int) error { clusterInfo, err := impl.clusterRepository.FindById(clusterId) if err != nil { - impl.logger.Errorw("error in fetching cluster","clusterId",clusterId, "err", err) + impl.logger.Errorw("error in fetching cluster", "clusterId", clusterId, "err", err) return err } @@ -309,6 +310,62 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error { return nil } +func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) error { + + clusterInfo, err := impl.clusterRepository.FindById(clusterId) + if err != nil { + impl.logger.Errorw("error in fetching cluster", "clusterId", clusterId, "err", err) + return err + } + + if _, ok := impl.informerStopper[clusterId]; ok { + impl.logger.Debug(fmt.Sprintf("informer for %s already exist", clusterInfo.ClusterName)) + return errors.New(INFORMER_ALREADY_EXIST_MESSAGE) + } + impl.logger.Infow("starting informer for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName) + clusterClient, err := impl.getK8sClientForCluster(clusterInfo) + if err != nil { + return err + } + + labelOptions := kubeinformers.WithTweakListOptions(func(opts *metav1.ListOptions) { + opts.LabelSelector = "devtron.ai/purpose==workflow" + }) + informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(clusterClient, 15*time.Minute, labelOptions) + stopper := make(chan struct{}) + podInformer := informerFactory.Core().V1().Pods() + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + if podObj, ok := newObj.(*coreV1.Pod); ok { + impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status) + nodeStatus := impl.assessNodeStatus(podObj) + workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus) + wfJson, err := json.Marshal(workflowStatus) + if err != nil { + impl.logger.Errorw("error occurred while marshalling workflowJson", "err", err) + return + } + impl.logger.Debugw("sending system executor ci workflow update event", "workflow", string(wfJson)) + if impl.pubSubClient == nil { + log.Println("don't publish") + return + } + + err = impl.pubSubClient.Publish(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, string(wfJson)) + if err != nil { + impl.logger.Errorw("Error while publishing Request", "err", err) + return + } + impl.logger.Debug("cd workflow update sent") + } + }, + }) + informerFactory.Start(stopper) + impl.logger.Infow("informer started for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName) + impl.informerStopper[clusterId] = stopper + return nil +} + func (impl *K8sInformerImpl) getK8sClientForCluster(clusterInfo *repository.Cluster) (*kubernetes.Clientset, error) { restConfig := &rest.Config{} if clusterInfo.ClusterName == DEFAULT_CLUSTER { @@ -509,7 +566,7 @@ func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1 workflowStatus.FinishedAt = nodeStatus.FinishedAt } workflowStatus.Phase = workflowPhase - nodeNameVsStatus := make(map[string]v1alpha1.NodeStatus,1) + nodeNameVsStatus := make(map[string]v1alpha1.NodeStatus, 1) nodeStatus.ID = podObj.Name nodeStatus.TemplateName = "cd" nodeStatus.Name = nodeStatus.ID From 66aee8406cb216528431150bb61a5ce4e3627cb7 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 13:28:07 +0530 Subject: [PATCH 03/15] Start informer for ci --- pkg/informer/K8sInformer.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index 012cd6cc..a3171343 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -107,6 +107,12 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCluster(clusterInfo C return err } + err = impl.startSystemWorkflowInformerForCi(clusterInfo.ClusterId) + if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + impl.logger.Error("error in creating informer for new cluster", "err", err) + return err + } + return nil } @@ -215,6 +221,11 @@ func (impl *K8sInformerImpl) handleClusterChangeEvent(secretObject *coreV1.Secre impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) return } + err = impl.startSystemWorkflowInformerForCi(clusterId) + if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) + return + } } else if string(action) == UPDATE { err = impl.syncSystemWorkflowInformer(clusterId) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { @@ -242,6 +253,11 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) return err } + err = impl.startSystemWorkflowInformerForCi(clusterId) + if err != nil { + impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) + return err + } return nil } From cec471a947d5007ee8cb8720b19f24178c9462d6 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 14:05:17 +0530 Subject: [PATCH 04/15] Start informer for ci --- pkg/informer/K8sInformer.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index a3171343..ab800234 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -78,7 +78,7 @@ func (impl *K8sInformerImpl) BuildInformerForAllClusters() error { return err } for _, model := range models { - impl.startSystemWorkflowInformerForCd(model.Id) + //impl.startSystemWorkflowInformerForCd(model.Id) impl.startSystemWorkflowInformerForCi(model.Id) } return nil @@ -101,13 +101,13 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCluster(clusterInfo C restConfig.Insecure = true } - err := impl.startSystemWorkflowInformerForCd(clusterInfo.ClusterId) - if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { - impl.logger.Error("error in creating informer for new cluster", "err", err) - return err - } + //err := impl.startSystemWorkflowInformerForCd(clusterInfo.ClusterId) + //if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + // impl.logger.Error("error in creating informer for new cluster", "err", err) + // return err + //} - err = impl.startSystemWorkflowInformerForCi(clusterInfo.ClusterId) + err := impl.startSystemWorkflowInformerForCi(clusterInfo.ClusterId) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { impl.logger.Error("error in creating informer for new cluster", "err", err) return err @@ -216,12 +216,12 @@ func (impl *K8sInformerImpl) handleClusterChangeEvent(secretObject *coreV1.Secre var err error if string(action) == ADD { - err = impl.startSystemWorkflowInformerForCd(clusterId) - if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { - impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) - return - } - err = impl.startSystemWorkflowInformerForCi(clusterId) + //err = impl.startSystemWorkflowInformerForCd(clusterId) + //if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + // impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) + // return + //} + err := impl.startSystemWorkflowInformerForCi(clusterId) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) return @@ -248,11 +248,11 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { impl.stopSystemWorkflowInformer(clusterInfo.Id) impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) //create new informer for cluster with new config - err = impl.startSystemWorkflowInformerForCd(clusterId) - if err != nil { - impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) - return err - } + //err = impl.startSystemWorkflowInformerForCd(clusterId) + //if err != nil { + // impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) + // return err + //} err = impl.startSystemWorkflowInformerForCi(clusterId) if err != nil { impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) From 351b7643c9ba650d51ca2e01e8b9b5ed5f1f3bdd Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 15:28:42 +0530 Subject: [PATCH 05/15] add logger --- pkg/informer/K8sInformer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index ab800234..63067b23 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -354,6 +354,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) err UpdateFunc: func(oldObj, newObj interface{}) { if podObj, ok := newObj.(*coreV1.Pod); ok { impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status) + impl.logger.Debugw("podObj", podObj) nodeStatus := impl.assessNodeStatus(podObj) workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus) wfJson, err := json.Marshal(workflowStatus) From aa1356d825da4101b1df3705b74031e961acee2e Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 15:50:56 +0530 Subject: [PATCH 06/15] add logger --- pkg/informer/K8sInformer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index 63067b23..273f0031 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -354,7 +354,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) err UpdateFunc: func(oldObj, newObj interface{}) { if podObj, ok := newObj.(*coreV1.Pod); ok { impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status) - impl.logger.Debugw("podObj", podObj) + impl.logger.Debugw("podObj", "podObj", podObj) nodeStatus := impl.assessNodeStatus(podObj) workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus) wfJson, err := json.Marshal(workflowStatus) From e93e0ed2ce3e7204f3468a686b4eeafa5585bbaa Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 16:06:38 +0530 Subject: [PATCH 07/15] add logger --- pkg/informer/K8sInformer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index 273f0031..7b9df445 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -354,7 +354,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) err UpdateFunc: func(oldObj, newObj interface{}) { if podObj, ok := newObj.(*coreV1.Pod); ok { impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status) - impl.logger.Debugw("podObj", "podObj", podObj) + impl.logger.Debugw("podObj", "podObjName", podObj.Name) nodeStatus := impl.assessNodeStatus(podObj) workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus) wfJson, err := json.Marshal(workflowStatus) From bcded5a1a1eef3109fd91eb65ff0988bf9254961 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 16:29:27 +0530 Subject: [PATCH 08/15] templateName --- pkg/informer/K8sInformer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index 7b9df445..844ffb85 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -299,7 +299,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCd(clusterId int) err if podObj, ok := newObj.(*coreV1.Pod); ok { impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status) nodeStatus := impl.assessNodeStatus(podObj) - workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus) + workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus, "cd") wfJson, err := json.Marshal(workflowStatus) if err != nil { impl.logger.Errorw("error occurred while marshalling workflowJson", "err", err) @@ -356,7 +356,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) err impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status) impl.logger.Debugw("podObj", "podObjName", podObj.Name) nodeStatus := impl.assessNodeStatus(podObj) - workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus) + workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus, "ci") wfJson, err := json.Marshal(workflowStatus) if err != nil { impl.logger.Errorw("error occurred while marshalling workflowJson", "err", err) @@ -573,7 +573,7 @@ func (impl *K8sInformerImpl) inferFailedReason(pod *coreV1.Pod) (v1alpha1.NodePh return v1alpha1.NodeSucceeded, "" } -func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1alpha1.NodeStatus) *v1alpha1.WorkflowStatus { +func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1alpha1.NodeStatus, templateName string) *v1alpha1.WorkflowStatus { workflowStatus := &v1alpha1.WorkflowStatus{} workflowPhase := v1alpha1.WorkflowPhase(nodeStatus.Phase) if workflowPhase == v1alpha1.WorkflowPending { @@ -585,7 +585,7 @@ func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1 workflowStatus.Phase = workflowPhase nodeNameVsStatus := make(map[string]v1alpha1.NodeStatus, 1) nodeStatus.ID = podObj.Name - nodeStatus.TemplateName = "cd" + nodeStatus.TemplateName = templateName nodeStatus.Name = nodeStatus.ID nodeStatus.BoundaryID = impl.getPodOwnerName(podObj) nodeNameVsStatus[podObj.Name] = nodeStatus From 80d5078a2615649c0baf85eca852b464e3ca99db Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 16:47:47 +0530 Subject: [PATCH 09/15] CiCd workflow informer --- pkg/informer/K8sInformer.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index 844ffb85..cd9f1b67 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -78,7 +78,7 @@ func (impl *K8sInformerImpl) BuildInformerForAllClusters() error { return err } for _, model := range models { - //impl.startSystemWorkflowInformerForCd(model.Id) + impl.startSystemWorkflowInformerForCd(model.Id) impl.startSystemWorkflowInformerForCi(model.Id) } return nil @@ -101,13 +101,13 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCluster(clusterInfo C restConfig.Insecure = true } - //err := impl.startSystemWorkflowInformerForCd(clusterInfo.ClusterId) - //if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { - // impl.logger.Error("error in creating informer for new cluster", "err", err) - // return err - //} + err := impl.startSystemWorkflowInformerForCd(clusterInfo.ClusterId) + if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + impl.logger.Error("error in creating informer for new cluster", "err", err) + return err + } - err := impl.startSystemWorkflowInformerForCi(clusterInfo.ClusterId) + err = impl.startSystemWorkflowInformerForCi(clusterInfo.ClusterId) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { impl.logger.Error("error in creating informer for new cluster", "err", err) return err @@ -216,11 +216,11 @@ func (impl *K8sInformerImpl) handleClusterChangeEvent(secretObject *coreV1.Secre var err error if string(action) == ADD { - //err = impl.startSystemWorkflowInformerForCd(clusterId) - //if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { - // impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) - // return - //} + err = impl.startSystemWorkflowInformerForCd(clusterId) + if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) + return + } err := impl.startSystemWorkflowInformerForCi(clusterId) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) @@ -248,11 +248,11 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { impl.stopSystemWorkflowInformer(clusterInfo.Id) impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) //create new informer for cluster with new config - //err = impl.startSystemWorkflowInformerForCd(clusterId) - //if err != nil { - // impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) - // return err - //} + err = impl.startSystemWorkflowInformerForCd(clusterId) + if err != nil { + impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) + return err + } err = impl.startSystemWorkflowInformerForCi(clusterId) if err != nil { impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) From 44f885d5aee249c2af3e6cf725f9a7d65cffe031 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Mon, 23 Oct 2023 17:19:32 +0530 Subject: [PATCH 10/15] separate informer for Ci and Cd --- pkg/informer/K8sInformer.go | 42 ++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index cd9f1b67..faefa114 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -43,7 +43,7 @@ const ( type K8sInformer interface { startSystemWorkflowInformerForCluster(clusterInfo ClusterInfo) error syncSystemWorkflowInformer(clusterId int) error - stopSystemWorkflowInformer(clusterId int) + stopSystemWorkflowInformerCd(clusterId int) startSystemWorkflowInformerForCd(clusterId int) error BuildInformerForAllClusters() error } @@ -51,7 +51,8 @@ type K8sInformer interface { type K8sInformerImpl struct { logger *zap.SugaredLogger mutex sync.Mutex - informerStopper map[int]chan struct{} + CdInformerStopper map[int]chan struct{} + CiInformerStopper map[int]chan struct{} clusterRepository repository.ClusterRepository DefaultK8sConfig *rest.Config pubSubClient *pubsub.PubSubClientServiceImpl @@ -65,7 +66,8 @@ func NewK8sInformerImpl(logger *zap.SugaredLogger, clusterRepository repository. } defaultK8sConfig, _ := utils.GetDefaultK8sConfig("kubeconfigK8s") informerFactory.DefaultK8sConfig = defaultK8sConfig - informerFactory.informerStopper = make(map[int]chan struct{}) + informerFactory.CdInformerStopper = make(map[int]chan struct{}) + informerFactory.CiInformerStopper = make(map[int]chan struct{}) return informerFactory } @@ -197,11 +199,17 @@ func (impl *K8sInformerImpl) handleClusterDelete(clusterId int) bool { impl.logger.Errorw("Error in fetching cluster by id", "cluster-id ", clusterId, "err", err) return true } - impl.stopSystemWorkflowInformer(deleteClusterInfo.Id) + impl.stopSystemWorkflowInformerCd(deleteClusterInfo.Id) if err != nil { impl.logger.Errorw("error in updating informer for cluster", "id", clusterId, "err", err) return true } + impl.stopSystemWorkflowInformerCi(deleteClusterInfo.Id) + if err != nil { + impl.logger.Errorw("error in updating informer for cluster", "id", clusterId, "err", err) + return true + } + return false } @@ -245,7 +253,8 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { } //before creating new informer for cluster, close existing one impl.logger.Debugw("stopping informer for cluster - ", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) - impl.stopSystemWorkflowInformer(clusterInfo.Id) + impl.stopSystemWorkflowInformerCd(clusterInfo.Id) + impl.stopSystemWorkflowInformerCi(clusterInfo.Id) impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) //create new informer for cluster with new config err = impl.startSystemWorkflowInformerForCd(clusterId) @@ -261,11 +270,20 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { return nil } -func (impl *K8sInformerImpl) stopSystemWorkflowInformer(clusterId int) { - stopper := impl.informerStopper[clusterId] +func (impl *K8sInformerImpl) stopSystemWorkflowInformerCd(clusterId int) { + stopper := impl.CdInformerStopper[clusterId] + if stopper != nil { + close(stopper) + delete(impl.CdInformerStopper, clusterId) + } + return +} + +func (impl *K8sInformerImpl) stopSystemWorkflowInformerCi(clusterId int) { + stopper := impl.CiInformerStopper[clusterId] if stopper != nil { close(stopper) - delete(impl.informerStopper, clusterId) + delete(impl.CiInformerStopper, clusterId) } return } @@ -278,7 +296,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCd(clusterId int) err return err } - if _, ok := impl.informerStopper[clusterId]; ok { + if _, ok := impl.CdInformerStopper[clusterId]; ok { impl.logger.Debug(fmt.Sprintf("informer for %s already exist", clusterInfo.ClusterName)) return errors.New(INFORMER_ALREADY_EXIST_MESSAGE) } @@ -322,7 +340,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCd(clusterId int) err }) informerFactory.Start(stopper) impl.logger.Infow("informer started for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName) - impl.informerStopper[clusterId] = stopper + impl.CdInformerStopper[clusterId] = stopper return nil } @@ -334,7 +352,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) err return err } - if _, ok := impl.informerStopper[clusterId]; ok { + if _, ok := impl.CiInformerStopper[clusterId]; ok { impl.logger.Debug(fmt.Sprintf("informer for %s already exist", clusterInfo.ClusterName)) return errors.New(INFORMER_ALREADY_EXIST_MESSAGE) } @@ -379,7 +397,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) err }) informerFactory.Start(stopper) impl.logger.Infow("informer started for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName) - impl.informerStopper[clusterId] = stopper + impl.CiInformerStopper[clusterId] = stopper return nil } From af0d1a68dc10a34c94d7f147f62eb9155e412fa4 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Wed, 25 Oct 2023 11:30:52 +0530 Subject: [PATCH 11/15] refactoring --- pkg/controller/controller.go | 7 ----- pkg/informer/K8sInformer.go | 55 +++++++++++++++--------------------- 2 files changed, 23 insertions(+), 39 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8b6b0409..c201908c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -117,13 +117,6 @@ type ExternalCdConfig struct { Namespace string `env:"CD_EXTERNAL_NAMESPACE" envDefault:""` } -type ExternalCiConfig struct { - External bool `env:"CI_EXTERNAL_REST_LISTENER" envDefault:"false"` - Token string `env:"CI_EXTERNAL_ORCHESTRATOR_TOKEN" envDefault:""` - ListenerUrl string `env:"CI_EXTERNAL_LISTENER_URL" envDefault:"http://devtroncd-orchestrator-service-prod.devtroncd:80"` - Namespace string `env:"CI_EXTERNAL_NAMESPACE" envDefault:""` -} - type AcdConfig struct { ACDNamespace string `env:"ACD_NAMESPACE" envDefault:"devtroncd"` ACDInformer bool `env:"ACD_INFORMER" envDefault:"true"` diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index faefa114..66ff1586 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -41,10 +41,6 @@ const ( ) type K8sInformer interface { - startSystemWorkflowInformerForCluster(clusterInfo ClusterInfo) error - syncSystemWorkflowInformer(clusterId int) error - stopSystemWorkflowInformerCd(clusterId int) - startSystemWorkflowInformerForCd(clusterId int) error BuildInformerForAllClusters() error } @@ -80,8 +76,7 @@ func (impl *K8sInformerImpl) BuildInformerForAllClusters() error { return err } for _, model := range models { - impl.startSystemWorkflowInformerForCd(model.Id) - impl.startSystemWorkflowInformerForCi(model.Id) + impl.startSystemWorkflowInformerForCiCd(model.Id) } return nil } @@ -103,13 +98,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCluster(clusterInfo C restConfig.Insecure = true } - err := impl.startSystemWorkflowInformerForCd(clusterInfo.ClusterId) - if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { - impl.logger.Error("error in creating informer for new cluster", "err", err) - return err - } - - err = impl.startSystemWorkflowInformerForCi(clusterInfo.ClusterId) + err := impl.startSystemWorkflowInformerForCiCd(clusterInfo.ClusterId) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { impl.logger.Error("error in creating informer for new cluster", "err", err) return err @@ -224,13 +213,8 @@ func (impl *K8sInformerImpl) handleClusterChangeEvent(secretObject *coreV1.Secre var err error if string(action) == ADD { - err = impl.startSystemWorkflowInformerForCd(clusterId) - if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { - impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) - return - } - err := impl.startSystemWorkflowInformerForCi(clusterId) - if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + err := impl.startSystemWorkflowInformerForCiCd(clusterId) + if err != nil { impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err) return } @@ -257,12 +241,7 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { impl.stopSystemWorkflowInformerCi(clusterInfo.Id) impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) //create new informer for cluster with new config - err = impl.startSystemWorkflowInformerForCd(clusterId) - if err != nil { - impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) - return err - } - err = impl.startSystemWorkflowInformerForCi(clusterId) + err = impl.startSystemWorkflowInformerForCiCd(clusterId) if err != nil { impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName) return err @@ -288,7 +267,19 @@ func (impl *K8sInformerImpl) stopSystemWorkflowInformerCi(clusterId int) { return } -func (impl *K8sInformerImpl) startSystemWorkflowInformerForCd(clusterId int) error { +func (impl *K8sInformerImpl) startSystemWorkflowInformerForCiCd(clusterId int) error { + err := impl.startSystemWorkflowInformer(clusterId, impl.CdInformerStopper, pubsub.CD_WORKFLOW_STATUS_UPDATE) + if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + return err + } + err = impl.startSystemWorkflowInformer(clusterId, impl.CiInformerStopper, pubsub.WORKFLOW_STATUS_UPDATE_TOPIC) + if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { + return err + } + return nil +} + +func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int, informerStopper map[int]chan struct{}, eventType string) error { clusterInfo, err := impl.clusterRepository.FindById(clusterId) if err != nil { @@ -296,8 +287,8 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCd(clusterId int) err return err } - if _, ok := impl.CdInformerStopper[clusterId]; ok { - impl.logger.Debug(fmt.Sprintf("informer for %s already exist", clusterInfo.ClusterName)) + if _, ok := informerStopper[clusterId]; ok { + impl.logger.Debug(fmt.Sprintf("%s informer for %s already exist", eventType, clusterInfo.ClusterName)) return errors.New(INFORMER_ALREADY_EXIST_MESSAGE) } impl.logger.Infow("starting informer for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName) @@ -329,7 +320,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCd(clusterId int) err return } - err = impl.pubSubClient.Publish(pubsub.CD_WORKFLOW_STATUS_UPDATE, string(wfJson)) + err = impl.pubSubClient.Publish(eventType, string(wfJson)) if err != nil { impl.logger.Errorw("Error while publishing Request", "err", err) return @@ -340,7 +331,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCd(clusterId int) err }) informerFactory.Start(stopper) impl.logger.Infow("informer started for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName) - impl.CdInformerStopper[clusterId] = stopper + informerStopper[clusterId] = stopper return nil } @@ -352,7 +343,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) err return err } - if _, ok := impl.CiInformerStopper[clusterId]; ok { + if _, ok := impl.CiInformerStopper[clusterId]; !ok { impl.logger.Debug(fmt.Sprintf("informer for %s already exist", clusterInfo.ClusterName)) return errors.New(INFORMER_ALREADY_EXIST_MESSAGE) } From 3b71666c0ae52d59f8dfb97b5372d68d1d64e520 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Wed, 25 Oct 2023 11:37:19 +0530 Subject: [PATCH 12/15] remove unused codes --- pkg/informer/K8sInformer.go | 57 ------------------------------------- 1 file changed, 57 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index 66ff1586..a95548fb 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -335,63 +335,6 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int, informer return nil } -func (impl *K8sInformerImpl) startSystemWorkflowInformerForCi(clusterId int) error { - - clusterInfo, err := impl.clusterRepository.FindById(clusterId) - if err != nil { - impl.logger.Errorw("error in fetching cluster", "clusterId", clusterId, "err", err) - return err - } - - if _, ok := impl.CiInformerStopper[clusterId]; !ok { - impl.logger.Debug(fmt.Sprintf("informer for %s already exist", clusterInfo.ClusterName)) - return errors.New(INFORMER_ALREADY_EXIST_MESSAGE) - } - impl.logger.Infow("starting informer for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName) - clusterClient, err := impl.getK8sClientForCluster(clusterInfo) - if err != nil { - return err - } - - labelOptions := kubeinformers.WithTweakListOptions(func(opts *metav1.ListOptions) { - opts.LabelSelector = "devtron.ai/purpose==workflow" - }) - informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(clusterClient, 15*time.Minute, labelOptions) - stopper := make(chan struct{}) - podInformer := informerFactory.Core().V1().Pods() - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - UpdateFunc: func(oldObj, newObj interface{}) { - if podObj, ok := newObj.(*coreV1.Pod); ok { - impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status) - impl.logger.Debugw("podObj", "podObjName", podObj.Name) - nodeStatus := impl.assessNodeStatus(podObj) - workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus, "ci") - wfJson, err := json.Marshal(workflowStatus) - if err != nil { - impl.logger.Errorw("error occurred while marshalling workflowJson", "err", err) - return - } - impl.logger.Debugw("sending system executor ci workflow update event", "workflow", string(wfJson)) - if impl.pubSubClient == nil { - log.Println("don't publish") - return - } - - err = impl.pubSubClient.Publish(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, string(wfJson)) - if err != nil { - impl.logger.Errorw("Error while publishing Request", "err", err) - return - } - impl.logger.Debug("cd workflow update sent") - } - }, - }) - informerFactory.Start(stopper) - impl.logger.Infow("informer started for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName) - impl.CiInformerStopper[clusterId] = stopper - return nil -} - func (impl *K8sInformerImpl) getK8sClientForCluster(clusterInfo *repository.Cluster) (*kubernetes.Clientset, error) { restConfig := &rest.Config{} if clusterInfo.ClusterName == DEFAULT_CLUSTER { From 5d6390dcd94838252a8146cdbf87487d8f081f73 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Wed, 25 Oct 2023 11:50:24 +0530 Subject: [PATCH 13/15] add worklfow name --- pkg/informer/K8sInformer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index a95548fb..28acba2f 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -38,6 +38,8 @@ const ( INFORMER_ALREADY_EXIST_MESSAGE = "INFORMER_ALREADY_EXIST" ADD = "add" UPDATE = "update" + CI_WORKFLOW_NAME = "ci" + CD_WORKFLOW_NAME = "cd" ) type K8sInformer interface { @@ -268,18 +270,18 @@ func (impl *K8sInformerImpl) stopSystemWorkflowInformerCi(clusterId int) { } func (impl *K8sInformerImpl) startSystemWorkflowInformerForCiCd(clusterId int) error { - err := impl.startSystemWorkflowInformer(clusterId, impl.CdInformerStopper, pubsub.CD_WORKFLOW_STATUS_UPDATE) + err := impl.startSystemWorkflowInformer(clusterId, impl.CdInformerStopper, pubsub.CD_WORKFLOW_STATUS_UPDATE, CD_WORKFLOW_NAME) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { return err } - err = impl.startSystemWorkflowInformer(clusterId, impl.CiInformerStopper, pubsub.WORKFLOW_STATUS_UPDATE_TOPIC) + err = impl.startSystemWorkflowInformer(clusterId, impl.CiInformerStopper, pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, CI_WORKFLOW_NAME) if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) { return err } return nil } -func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int, informerStopper map[int]chan struct{}, eventType string) error { +func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int, informerStopper map[int]chan struct{}, eventType string, workflowName string) error { clusterInfo, err := impl.clusterRepository.FindById(clusterId) if err != nil { @@ -308,7 +310,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int, informer if podObj, ok := newObj.(*coreV1.Pod); ok { impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status) nodeStatus := impl.assessNodeStatus(podObj) - workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus, "cd") + workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus, workflowName) wfJson, err := json.Marshal(workflowStatus) if err != nil { impl.logger.Errorw("error occurred while marshalling workflowJson", "err", err) From e9c03ffad26e927d2f44bf0b15bf12895daac26c Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Wed, 25 Oct 2023 11:54:01 +0530 Subject: [PATCH 14/15] common stop for system workflow informer --- pkg/informer/K8sInformer.go | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index 28acba2f..f29d8606 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -190,16 +190,7 @@ func (impl *K8sInformerImpl) handleClusterDelete(clusterId int) bool { impl.logger.Errorw("Error in fetching cluster by id", "cluster-id ", clusterId, "err", err) return true } - impl.stopSystemWorkflowInformerCd(deleteClusterInfo.Id) - if err != nil { - impl.logger.Errorw("error in updating informer for cluster", "id", clusterId, "err", err) - return true - } - impl.stopSystemWorkflowInformerCi(deleteClusterInfo.Id) - if err != nil { - impl.logger.Errorw("error in updating informer for cluster", "id", clusterId, "err", err) - return true - } + impl.stopSystemWorkflowInformer(deleteClusterInfo.Id) return false } @@ -239,8 +230,7 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { } //before creating new informer for cluster, close existing one impl.logger.Debugw("stopping informer for cluster - ", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) - impl.stopSystemWorkflowInformerCd(clusterInfo.Id) - impl.stopSystemWorkflowInformerCi(clusterInfo.Id) + impl.stopSystemWorkflowInformer(clusterInfo.Id) impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) //create new informer for cluster with new config err = impl.startSystemWorkflowInformerForCiCd(clusterId) @@ -251,17 +241,13 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { return nil } -func (impl *K8sInformerImpl) stopSystemWorkflowInformerCd(clusterId int) { +func (impl *K8sInformerImpl) stopSystemWorkflowInformer(clusterId int) { stopper := impl.CdInformerStopper[clusterId] if stopper != nil { close(stopper) delete(impl.CdInformerStopper, clusterId) } - return -} - -func (impl *K8sInformerImpl) stopSystemWorkflowInformerCi(clusterId int) { - stopper := impl.CiInformerStopper[clusterId] + stopper = impl.CiInformerStopper[clusterId] if stopper != nil { close(stopper) delete(impl.CiInformerStopper, clusterId) From 1ad014c552b5f583e831ca48e98061645be06fd8 Mon Sep 17 00:00:00 2001 From: Ashish-devtron Date: Wed, 25 Oct 2023 11:55:24 +0530 Subject: [PATCH 15/15] rename func --- pkg/informer/K8sInformer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/informer/K8sInformer.go b/pkg/informer/K8sInformer.go index f29d8606..7b805219 100644 --- a/pkg/informer/K8sInformer.go +++ b/pkg/informer/K8sInformer.go @@ -190,7 +190,7 @@ func (impl *K8sInformerImpl) handleClusterDelete(clusterId int) bool { impl.logger.Errorw("Error in fetching cluster by id", "cluster-id ", clusterId, "err", err) return true } - impl.stopSystemWorkflowInformer(deleteClusterInfo.Id) + impl.stopSystemWorkflowInformerForCiCd(deleteClusterInfo.Id) return false } @@ -230,7 +230,7 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { } //before creating new informer for cluster, close existing one impl.logger.Debugw("stopping informer for cluster - ", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) - impl.stopSystemWorkflowInformer(clusterInfo.Id) + impl.stopSystemWorkflowInformerForCiCd(clusterInfo.Id) impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id) //create new informer for cluster with new config err = impl.startSystemWorkflowInformerForCiCd(clusterId) @@ -241,7 +241,7 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error { return nil } -func (impl *K8sInformerImpl) stopSystemWorkflowInformer(clusterId int) { +func (impl *K8sInformerImpl) stopSystemWorkflowInformerForCiCd(clusterId int) { stopper := impl.CdInformerStopper[clusterId] if stopper != nil { close(stopper)