Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Run job using system exec #39

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
} else {
namespace = ciCfg.DefaultNamespace
}
clusterCfg := &ClusterConfig{}
err = env.Parse(clusterCfg)
Comment on lines +171 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do this once for both ci and cd informer

if clusterCfg.ClusterType == ClusterTypeAll && !externalCD.External {
startSystemWorkflowInformer(logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is being called two times and why externalCD.External

}
stopCh := make(chan struct{})
defer close(stopCh)
startWorkflowInformer(namespace, logger, pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, stopCh, dynamicClient, externalCD)
Expand Down
74 changes: 44 additions & 30 deletions pkg/informer/K8sInformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,19 @@ const (
INFORMER_ALREADY_EXIST_MESSAGE = "INFORMER_ALREADY_EXIST"
ADD = "add"
UPDATE = "update"
CI_WORKFLOW_NAME = "ci"
CD_WORKFLOW_NAME = "cd"
)

type K8sInformer interface {
startSystemWorkflowInformerForCluster(clusterInfo ClusterInfo) error
syncSystemWorkflowInformer(clusterId int) error
stopSystemWorkflowInformer(clusterId int)
startSystemWorkflowInformer(clusterId int) error
BuildInformerForAllClusters() error
}

type K8sInformerImpl struct {
logger *zap.SugaredLogger
mutex sync.Mutex
informerStopper map[int]chan struct{}
CdInformerStopper map[int]chan struct{}
CiInformerStopper map[int]chan struct{}
clusterRepository repository.ClusterRepository
DefaultK8sConfig *rest.Config
pubSubClient *pubsub.PubSubClientServiceImpl
Expand All @@ -65,7 +64,8 @@ func NewK8sInformerImpl(logger *zap.SugaredLogger, clusterRepository repository.
}
defaultK8sConfig, _ := utils.GetDefaultK8sConfig("kubeconfigK8s")
informerFactory.DefaultK8sConfig = defaultK8sConfig
informerFactory.informerStopper = make(map[int]chan struct{})
informerFactory.CdInformerStopper = make(map[int]chan struct{})
informerFactory.CiInformerStopper = make(map[int]chan struct{})
return informerFactory
}

Expand All @@ -78,7 +78,7 @@ func (impl *K8sInformerImpl) BuildInformerForAllClusters() error {
return err
}
for _, model := range models {
impl.startSystemWorkflowInformer(model.Id)
impl.startSystemWorkflowInformerForCiCd(model.Id)
}
return nil
}
Expand All @@ -100,7 +100,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCluster(clusterInfo C
restConfig.Insecure = true
}

err := impl.startSystemWorkflowInformer(clusterInfo.ClusterId)
err := impl.startSystemWorkflowInformerForCiCd(clusterInfo.ClusterId)
if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) {
impl.logger.Error("error in creating informer for new cluster", "err", err)
return err
Expand Down Expand Up @@ -190,11 +190,8 @@ func (impl *K8sInformerImpl) handleClusterDelete(clusterId int) bool {
impl.logger.Errorw("Error in fetching cluster by id", "cluster-id ", clusterId, "err", err)
return true
}
impl.stopSystemWorkflowInformer(deleteClusterInfo.Id)
if err != nil {
impl.logger.Errorw("error in updating informer for cluster", "id", clusterId, "err", err)
return true
}
impl.stopSystemWorkflowInformerForCiCd(deleteClusterInfo.Id)

return false
}

Expand All @@ -209,8 +206,8 @@ func (impl *K8sInformerImpl) handleClusterChangeEvent(secretObject *coreV1.Secre
var err error

if string(action) == ADD {
err = impl.startSystemWorkflowInformer(clusterId)
if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) {
err := impl.startSystemWorkflowInformerForCiCd(clusterId)
if err != nil {
impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err)
return
}
Expand All @@ -233,36 +230,53 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error {
}
//before creating new informer for cluster, close existing one
impl.logger.Debugw("stopping informer for cluster - ", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id)
impl.stopSystemWorkflowInformer(clusterInfo.Id)
impl.stopSystemWorkflowInformerForCiCd(clusterInfo.Id)
impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id)
//create new informer for cluster with new config
err = impl.startSystemWorkflowInformer(clusterId)
err = impl.startSystemWorkflowInformerForCiCd(clusterId)
if err != nil {
impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName)
return err
}
return nil
}

func (impl *K8sInformerImpl) stopSystemWorkflowInformer(clusterId int) {
stopper := impl.informerStopper[clusterId]
func (impl *K8sInformerImpl) stopSystemWorkflowInformerForCiCd(clusterId int) {
stopper := impl.CdInformerStopper[clusterId]
if stopper != nil {
close(stopper)
delete(impl.CdInformerStopper, clusterId)
}
stopper = impl.CiInformerStopper[clusterId]
if stopper != nil {
close(stopper)
delete(impl.informerStopper, clusterId)
delete(impl.CiInformerStopper, clusterId)
}
return
}

func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error {
func (impl *K8sInformerImpl) startSystemWorkflowInformerForCiCd(clusterId int) error {
err := impl.startSystemWorkflowInformer(clusterId, impl.CdInformerStopper, pubsub.CD_WORKFLOW_STATUS_UPDATE, CD_WORKFLOW_NAME)
if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) {
return err
}
err = impl.startSystemWorkflowInformer(clusterId, impl.CiInformerStopper, pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, CI_WORKFLOW_NAME)
if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) {
return err
}
return nil
}

func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int, informerStopper map[int]chan struct{}, eventType string, workflowName string) error {

clusterInfo, err := impl.clusterRepository.FindById(clusterId)
if err != nil {
impl.logger.Errorw("error in fetching cluster","clusterId",clusterId, "err", err)
impl.logger.Errorw("error in fetching cluster", "clusterId", clusterId, "err", err)
return err
}

if _, ok := impl.informerStopper[clusterId]; ok {
impl.logger.Debug(fmt.Sprintf("informer for %s already exist", clusterInfo.ClusterName))
if _, ok := informerStopper[clusterId]; ok {
impl.logger.Debug(fmt.Sprintf("%s informer for %s already exist", eventType, clusterInfo.ClusterName))
return errors.New(INFORMER_ALREADY_EXIST_MESSAGE)
}
impl.logger.Infow("starting informer for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName)
Expand All @@ -282,7 +296,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error {
if podObj, ok := newObj.(*coreV1.Pod); ok {
impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status)
nodeStatus := impl.assessNodeStatus(podObj)
workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus)
workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus, workflowName)
wfJson, err := json.Marshal(workflowStatus)
if err != nil {
impl.logger.Errorw("error occurred while marshalling workflowJson", "err", err)
Expand All @@ -294,7 +308,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error {
return
}

err = impl.pubSubClient.Publish(pubsub.CD_WORKFLOW_STATUS_UPDATE, string(wfJson))
err = impl.pubSubClient.Publish(eventType, string(wfJson))
if err != nil {
impl.logger.Errorw("Error while publishing Request", "err", err)
return
Expand All @@ -305,7 +319,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error {
})
informerFactory.Start(stopper)
impl.logger.Infow("informer started for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName)
impl.informerStopper[clusterId] = stopper
informerStopper[clusterId] = stopper
return nil
}

Expand Down Expand Up @@ -499,7 +513,7 @@ func (impl *K8sInformerImpl) inferFailedReason(pod *coreV1.Pod) (v1alpha1.NodePh
return v1alpha1.NodeSucceeded, ""
}

func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1alpha1.NodeStatus) *v1alpha1.WorkflowStatus {
func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1alpha1.NodeStatus, templateName string) *v1alpha1.WorkflowStatus {
workflowStatus := &v1alpha1.WorkflowStatus{}
workflowPhase := v1alpha1.WorkflowPhase(nodeStatus.Phase)
if workflowPhase == v1alpha1.WorkflowPending {
Expand All @@ -509,9 +523,9 @@ func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1
workflowStatus.FinishedAt = nodeStatus.FinishedAt
}
workflowStatus.Phase = workflowPhase
nodeNameVsStatus := make(map[string]v1alpha1.NodeStatus,1)
nodeNameVsStatus := make(map[string]v1alpha1.NodeStatus, 1)
nodeStatus.ID = podObj.Name
nodeStatus.TemplateName = "cd"
nodeStatus.TemplateName = templateName
nodeStatus.Name = nodeStatus.ID
nodeStatus.BoundaryID = impl.getPodOwnerName(podObj)
nodeNameVsStatus[podObj.Name] = nodeStatus
Expand Down