Skip to content

Commit

Permalink
upgrade-manager-v2: Load test fixes (#245)
Browse files Browse the repository at this point in the history
* upgrade-manager-v2: Move DrainManager back to Reconciler (#236)

* #2285: rollup CR statistic metrics in v2 (#218)

* #2285: rollup CR statistic metrics in v2

Signed-off-by: sbadla1 <[email protected]>

* #2285: updated metric flags

Signed-off-by: sbadla1 <[email protected]>

* #2285: updated metric flags

Signed-off-by: sbadla1 <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* log cloud discovery failure

Signed-off-by: sbadiger <[email protected]>

* Create RollingUpgrade Context

Signed-off-by: sbadiger <[email protected]>

* rollingupgrade context

Signed-off-by: sbadiger <[email protected]>

* #2285: rollup CR statistic metrics in v2 (#218)

* #2285: rollup CR statistic metrics in v2

Signed-off-by: sbadla1 <[email protected]>

* #2285: updated metric flags

Signed-off-by: sbadla1 <[email protected]>

* #2285: updated metric flags

Signed-off-by: sbadla1 <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* #2285: renamed some methods related to metrics (#224)

Signed-off-by: sbadla1 <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* #2286: removed version from metric namespace (#227)

Signed-off-by: sbadla1 <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* resolve compile errors due to merge conflict

Signed-off-by: sbadiger <[email protected]>

* move drain-manager to reconciler

Signed-off-by: sbadiger <[email protected]>

* initialize RollingUpgrade object

Signed-off-by: sbadiger <[email protected]>

* use bool instead of count for standby function

Signed-off-by: sbadiger <[email protected]>

* refactor in-progress and standby code

Signed-off-by: sbadiger <[email protected]>

* rename instance standby function

Signed-off-by: sbadiger <[email protected]>

* DrainManager changes in unit test files

Signed-off-by: sbadiger <[email protected]>

Co-authored-by: Sahil Badla <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* V2 controller metrics concurrency fix (#231)

* Refine the metrics status

Signed-off-by: xshao <[email protected]>

* Refine the metrics status

Signed-off-by: xshao <[email protected]>

* Fix test case error

Signed-off-by: xshao <[email protected]>

* Use group instead of ASG

Signed-off-by: xshao <[email protected]>

* Ignore generated code

Signed-off-by: xshao <[email protected]>

* Ignore generated code

Signed-off-by: xshao <[email protected]>

* Fix the concurrent issue

Signed-off-by: xshao <[email protected]>

* Fix the concurrent issue

Signed-off-by: xshao <[email protected]>

* Move metrics related functions into RollingUpgradeContext

Signed-off-by: xshao <[email protected]>

* Move metrics related functions into RollingUpgradeContext

Signed-off-by: xshao <[email protected]>

* Move metrics related functions into upgrade_metrics.go

Signed-off-by: xshao <[email protected]>

* Move metrics related functions into metrics.go

Signed-off-by: xshao <[email protected]>
Signed-off-by: sbadiger <[email protected]>

* add missing parenthesis

Signed-off-by: sbadiger <[email protected]>

* load test fixes

Signed-off-by: sbadiger <[email protected]>

* handle scaling group not found

Signed-off-by: sbadiger <[email protected]>

* Update upgrade.go

Signed-off-by: sbadiger <[email protected]>

* log one level up

* remove double logging

Signed-off-by: sbadiger <[email protected]>
  • Loading branch information
shreyas-badiger authored Jun 2, 2021
1 parent 1fc5847 commit 18e0e75
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 45 deletions.
2 changes: 1 addition & 1 deletion controllers/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func createRollingUpgradeContext(r *RollingUpgradeReconciler) *RollingUpgradeCon
DrainGroup: drainGroup.(*sync.WaitGroup),
},
RollingUpgrade: rollingUpgrade,
metricsMutex: &sync.Mutex{},
metricsMutex: &sync.Mutex{},
}

}
Expand Down
19 changes: 0 additions & 19 deletions controllers/providers/aws/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,25 +96,6 @@ func GetInstanceIDs(instances []*autoscaling.Instance) []string {
return IDs
}

// func SelectInstancesByAZ(instances []*autoscaling.Group) *autoscaling.Instance {
// for _, instance := range group.Instances {
// selectedID := aws.StringValue(instance.InstanceId)
// if strings.EqualFold(instanceID, selectedID) {
// return instance
// }
// }
// return &autoscaling.Instance{}
// }

// func ListScalingInstanceIDs(group *autoscaling.Group) []string {
// instanceIDs := make([]string, 0)
// for _, instance := range group.Instances {
// instanceID := aws.StringValue(instance.InstanceId)
// instanceIDs = append(instanceIDs, instanceID)
// }
// return instanceIDs
// }

func GetTemplateLatestVersion(templates []*ec2.LaunchTemplate, templateName string) string {
for _, template := range templates {
name := aws.StringValue(template.LaunchTemplateName)
Expand Down
21 changes: 13 additions & 8 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package controllers

import (
"context"
"fmt"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -65,12 +66,13 @@ type RollingUpgradeAuthenticator struct {
// Reconcile reads that state of the cluster for a RollingUpgrade object and makes changes based on the state read
// and the details in the RollingUpgrade.Spec
func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
r.Info("***Reconciling***")
rollingUpgrade := &v1alpha1.RollingUpgrade{}
err := r.Get(ctx, req.NamespacedName, rollingUpgrade)
if err != nil {
if kerrors.IsNotFound(err) {
r.AdmissionMap.Delete(req.NamespacedName)
r.Info("deleted object from admission map", "name", req.NamespacedName)
r.AdmissionMap.Delete(fmt.Sprintf("%s", req.NamespacedName))
r.Info("rolling upgrade resource not found, deleted object from admission map", "name", req.NamespacedName)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
Expand Down Expand Up @@ -108,7 +110,7 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
val := v.(string)
resource := k.(string)
if strings.EqualFold(val, scalingGroupName) && !strings.EqualFold(resource, rollingUpgrade.NamespacedName()) {
r.Info("object already being processed by existing resource", "resource", resource, "scalingGroup", scalingGroupName)
r.Info("object already being processed by existing resource", "resource", resource, "scalingGroup", scalingGroupName, "name", rollingUpgrade.NamespacedName())
inProgress = true
return false
}
Expand All @@ -120,8 +122,13 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{RequeueAfter: time.Second * 30}, nil
}

r.Info("admitted new rollingupgrade", "name", rollingUpgrade.NamespacedName(), "scalingGroup", scalingGroupName)
r.AdmissionMap.Store(rollingUpgrade.NamespacedName(), scalingGroupName)
// store the rolling upgrade in admission map
if _, present := r.AdmissionMap.LoadOrStore(rollingUpgrade.NamespacedName(), scalingGroupName); present == false {
r.Info("admitted new rolling upgrade", "scalingGroup", scalingGroupName, "update strategy", rollingUpgrade.Spec.Strategy, "name", rollingUpgrade.NamespacedName())
r.CacheConfig.FlushCache("autoscaling")
} else {
r.Info("operating on existing rolling upgrade", "scalingGroup", scalingGroupName, "update strategy", rollingUpgrade.Spec.Strategy, "name", rollingUpgrade.NamespacedName())
}
rollingUpgrade.SetCurrentStatus(v1alpha1.StatusInit)
common.SetMetricRollupInitOrRunning(rollingUpgrade.Name)

Expand All @@ -141,17 +148,15 @@ func (r *RollingUpgradeReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
rollupCtx.Cloud = NewDiscoveredState(rollupCtx.Auth, rollupCtx.Logger)
if err := rollupCtx.Cloud.Discover(); err != nil {
r.Info("failed to discover the cloud", "name", rollingUpgrade.NamespacedName(), "scalingGroup", scalingGroupName)
r.Info("failed to discover the cloud", "scalingGroup", scalingGroupName, "name", rollingUpgrade.NamespacedName())
rollingUpgrade.SetCurrentStatus(v1alpha1.StatusError)
// Set prometheus metric cr_status_failed
common.SetMetricRollupFailed(rollingUpgrade.Name)
return ctrl.Result{}, err
}

// process node rotation
if err := rollupCtx.RotateNodes(); err != nil {
rollingUpgrade.SetCurrentStatus(v1alpha1.StatusError)
// Set prometheus metric cr_status_failed
common.SetMetricRollupFailed(rollingUpgrade.Name)
return ctrl.Result{}, err
}
Expand Down
42 changes: 25 additions & 17 deletions controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package controllers

import (
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -91,13 +91,21 @@ func (r *RollingUpgradeContext) RotateNodes() error {
var (
scalingGroup = awsprovider.SelectScalingGroup(r.RollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups)
)
if reflect.DeepEqual(scalingGroup, &autoscaling.Group{}) {
return errors.Errorf("scaling group not found, scalingGroupName: %v", r.RollingUpgrade.ScalingGroupName())
}
r.Info(
"scaling group details",
"scalingGroup", r.RollingUpgrade.ScalingGroupName(),
"launchConfig", aws.StringValue(scalingGroup.LaunchConfigurationName),
"name", r.RollingUpgrade.NamespacedName(),
)

r.RollingUpgrade.SetTotalNodes(len(scalingGroup.Instances))

// check if all instances are rotated.
if !r.IsScalingGroupDrifted() {
r.RollingUpgrade.SetCurrentStatus(v1alpha1.StatusComplete)
// Set prometheus metric cr_status_completed
common.SetMetricRollupCompleted(r.RollingUpgrade.Name)
return nil
}
Expand Down Expand Up @@ -140,7 +148,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
// Add in-progress tag
r.Info("setting instances to in-progress", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName())
if err := r.Auth.TagEC2instances(inServiceInstanceIDs, instanceStateTagKey, inProgressTagValue); err != nil {
r.Error(err, "failed to set instancecs to in-progress", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName())
r.Error(err, "failed to set instances to in-progress", "batch", batchInstanceIDs, "instances(InService)", inServiceInstanceIDs, "name", r.RollingUpgrade.NamespacedName())
return false, err
}
// Standby
Expand Down Expand Up @@ -170,6 +178,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
r.Info("new node is yet to join the cluster", "name", r.RollingUpgrade.NamespacedName())
return true, nil
}
r.Info("desired nodes are ready", "name", r.RollingUpgrade.NamespacedName())

case v1alpha1.UpdateStrategyModeLazy:
for _, target := range batch {
Expand All @@ -190,7 +199,6 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
}
}

fmt.Println("r.DrainManager", r.DrainManager)
if reflect.DeepEqual(r.DrainManager.DrainGroup, &sync.WaitGroup{}) {
for _, target := range batch {
var (
Expand Down Expand Up @@ -247,9 +255,9 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
}
}

timeout := make(chan struct{})
done := make(chan struct{})
go func() {
defer close(timeout)
defer close(done)
r.DrainManager.DrainGroup.Wait()
}()

Expand All @@ -261,7 +269,7 @@ func (r *RollingUpgradeContext) ReplaceNodeBatch(batch []*autoscaling.Instance)
r.Error(err, "failed to rotate the node", "name", r.RollingUpgrade.NamespacedName())
return false, err

case <-timeout:
case <-done:
// goroutines completed, terminate and requeue
r.RollingUpgrade.SetLastNodeDrainTime(metav1.Time{Time: time.Now()})
r.Info("instances drained successfully, terminating", "name", r.RollingUpgrade.NamespacedName())
Expand Down Expand Up @@ -326,12 +334,16 @@ func (r *RollingUpgradeContext) SelectTargets(scalingGroup *autoscaling.Group) [

var unavailableInt int
if batchSize.Type == intstr.String {
unavailableInt, _ = intstr.GetValueFromIntOrPercent(&batchSize, totalNodes, true)
if strings.Contains(batchSize.StrVal, "%") {
unavailableInt, _ = intstr.GetValueFromIntOrPercent(&batchSize, totalNodes, true)
}
unavailableInt, _ = strconv.Atoi(batchSize.StrVal)
} else {
unavailableInt = batchSize.IntValue()
}

// first process all in progress instances
r.Info("selecting batch for rotation", "batch size", batchSize, "name", r.RollingUpgrade.NamespacedName())
for _, instance := range r.Cloud.InProgressInstances {
if selectedInstance := awsprovider.SelectScalingGroupInstance(instance, scalingGroup); !reflect.DeepEqual(selectedInstance, &autoscaling.Instance{}) {
targets = append(targets, selectedInstance)
Expand Down Expand Up @@ -392,6 +404,7 @@ func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance
if common.ContainsEqualFold(awsprovider.TerminatingInstanceStates, aws.StringValue(instance.LifecycleState)) {
return false
}

// check if there is atleast one node that meets the force-referesh criteria
if r.RollingUpgrade.IsForceRefresh() {
var (
Expand All @@ -407,18 +420,15 @@ func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance

if scalingGroup.LaunchConfigurationName != nil {
if instance.LaunchConfigurationName == nil {
r.Info("launch configuration name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName())
return true
}
launchConfigName := aws.StringValue(scalingGroup.LaunchConfigurationName)
instanceConfigName := aws.StringValue(instance.LaunchConfigurationName)
if !strings.EqualFold(launchConfigName, instanceConfigName) {
r.Info("launch configuration name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName())
return true
}
} else if scalingGroup.LaunchTemplate != nil {
if instance.LaunchTemplate == nil {
r.Info("launch template name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName())
return true
}

Expand All @@ -430,16 +440,13 @@ func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance
)

if !strings.EqualFold(launchTemplateName, instanceTemplateName) {
r.Info("launch template name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName())
return true
} else if !strings.EqualFold(instanceTemplateVersion, templateVersion) {
r.Info("launch template version differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName())
return true
}

} else if scalingGroup.MixedInstancesPolicy != nil {
if instance.LaunchTemplate == nil {
r.Info("launch template name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName())
return true
}

Expand All @@ -451,15 +458,12 @@ func (r *RollingUpgradeContext) IsInstanceDrifted(instance *autoscaling.Instance
)

if !strings.EqualFold(launchTemplateName, instanceTemplateName) {
r.Info("launch template name differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName())
return true
} else if !strings.EqualFold(instanceTemplateVersion, templateVersion) {
r.Info("launch template version differs", "instance", instanceID, "name", r.RollingUpgrade.NamespacedName())
return true
}
}

r.Info("node refresh not required", "name", r.RollingUpgrade.NamespacedName(), "instance", instanceID)
return false
}

Expand All @@ -469,9 +473,11 @@ func (r *RollingUpgradeContext) IsScalingGroupDrifted() bool {
scalingGroup := awsprovider.SelectScalingGroup(r.RollingUpgrade.ScalingGroupName(), r.Cloud.ScalingGroups)
for _, instance := range scalingGroup.Instances {
if r.IsInstanceDrifted(instance) {
r.Info("launch definition differs", "instance", aws.StringValue(instance.InstanceId), "name", r.RollingUpgrade.NamespacedName())
return true
}
}
r.Info("no drift in scaling group", "name", r.RollingUpgrade.NamespacedName())
return false
}

Expand All @@ -485,6 +491,7 @@ func (r *RollingUpgradeContext) DesiredNodesReady() bool {
// wait for desired instances
inServiceInstanceIDs := awsprovider.GetInServiceInstanceIDs(scalingGroup.Instances)
if len(inServiceInstanceIDs) != int(desiredInstances) {
r.Info("desired number of instances are not InService", "desired", int(desiredInstances), "inServiceCount", len(inServiceInstanceIDs), "name", r.RollingUpgrade.NamespacedName())
return false
}

Expand All @@ -498,6 +505,7 @@ func (r *RollingUpgradeContext) DesiredNodesReady() bool {
}
}
if readyNodes != int(desiredInstances) {
r.Info("desired number of nodes are not ready", "desired", int(desiredInstances), "readyNodesCount", readyNodes, "name", r.RollingUpgrade.NamespacedName())
return false
}

Expand Down

0 comments on commit 18e0e75

Please sign in to comment.