From ed44191defa84f588167bc999b2aec5e84c7a96a Mon Sep 17 00:00:00 2001 From: xshao Date: Tue, 2 Mar 2021 13:22:38 -0800 Subject: [PATCH] Metrics features Signed-off-by: xshao --- api/v1alpha1/rollingupgrade_types.go | 106 ++++++++++++++++++ api/v1alpha1/rollingupgrade_types_test.go | 47 ++++++++ api/v1alpha1/zz_generated.deepcopy.go | 60 ++++++++++ ...grademgr.keikoproj.io_rollingupgrades.yaml | 33 ++++++ controllers/common/metrics.go | 74 ++++++++++++ controllers/common/metrics_test.go | 29 +++++ controllers/providers/kubernetes/utils.go | 10 +- controllers/upgrade.go | 28 +++++ go.mod | 8 +- main.go | 5 + 10 files changed, 393 insertions(+), 7 deletions(-) create mode 100644 api/v1alpha1/rollingupgrade_types_test.go create mode 100644 controllers/common/metrics.go create mode 100644 controllers/common/metrics_test.go diff --git a/api/v1alpha1/rollingupgrade_types.go b/api/v1alpha1/rollingupgrade_types.go index f0fc2d00..bc3447a8 100644 --- a/api/v1alpha1/rollingupgrade_types.go +++ b/api/v1alpha1/rollingupgrade_types.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/keikoproj/upgrade-manager/controllers/common" corev1 "k8s.io/api/core/v1" @@ -52,6 +53,85 @@ type RollingUpgradeStatus struct { Conditions []RollingUpgradeCondition `json:"conditions,omitempty"` LastNodeTerminationTime metav1.Time `json:"lastTerminationTime,omitempty"` LastNodeDrainTime metav1.Time `json:"lastDrainTime,omitempty"` + + Statistics []*RollingUpgradeStatistics `json:"statistics,omitempty"` + InProcessingNodes map[string]*NodeInProcessing `json:"inProcessingNodes,omitempty"` +} + +// RollingUpgrade Statistics, includes summary(sum/count) from each step +type RollingUpgradeStatistics struct { + StepName RollingUpgradeStep `json:"stepName,omitempty"` + DurationSum metav1.Duration `json:"durationSum,omitempty"` + DurationCount int32 `json:"durationCount,omitempty"` +} + +// Node In-processing +type NodeInProcessing struct { + NodeName string `json:"nodeName,omitempty"` + StepName RollingUpgradeStep `json:"stepName,omitempty"` + UpgradeStartTime metav1.Time `json:"upgradeStartTime,omitempty"` + StepStartTime metav1.Time `json:"stepStartTime,omitempty"` + StepEndTime metav1.Time `json:"stepEndTime,omitempty"` +} + +// Add one step duration +func (s *RollingUpgradeStatus) addStepDuration(asgName string, stepName RollingUpgradeStep, duration time.Duration) { + // if step exists, add count and sum, otherwise append + for _, s := range s.Statistics { + if s.StepName == stepName { + s.DurationSum = metav1.Duration{ + Duration: s.DurationSum.Duration + duration, + } + s.DurationCount += 1 + return + } + } + s.Statistics = append(s.Statistics, &RollingUpgradeStatistics{ + StepName: stepName, + DurationSum: metav1.Duration{ + Duration: duration, + }, + DurationCount: 1, + }) + + //Add to system level statistics + common.AddRollingUpgradeStepDuration(asgName, string(stepName), duration) +} + +// Node turns onto step +func (s *RollingUpgradeStatus) NodeStep(asgName string, nodeName string, stepName RollingUpgradeStep) { + if s.InProcessingNodes == nil { + s.InProcessingNodes = make(map[string]*NodeInProcessing) + } + var inProcessingNode *NodeInProcessing + if n, ok := s.InProcessingNodes[nodeName]; !ok { + inProcessingNode = &NodeInProcessing{ + NodeName: nodeName, + StepName: stepName, + UpgradeStartTime: metav1.Now(), + StepStartTime: metav1.Now(), + } + s.InProcessingNodes[nodeName] = inProcessingNode + } else { + inProcessingNode = n + n.StepEndTime = metav1.Now() + var duration = n.StepEndTime.Sub(n.StepStartTime.Time) + if stepName == NodeRotationCompleted { + //Add overall and remove the node from in-processing map + var total = n.StepEndTime.Sub(n.UpgradeStartTime.Time) + s.addStepDuration(asgName, inProcessingNode.StepName, duration) + s.addStepDuration(asgName, NodeRotationTotal, total) + delete(s.InProcessingNodes, nodeName) + } else if inProcessingNode.StepName != stepName { //Still same step + var oldOrder = NodeRotationStepOrders[inProcessingNode.StepName] + var newOrder = NodeRotationStepOrders[stepName] + if newOrder > oldOrder { //Make sure the steps running in order + s.addStepDuration(asgName, inProcessingNode.StepName, duration) + n.StepStartTime = metav1.Now() + inProcessingNode.StepName = stepName + } + } + } } func (s *RollingUpgradeStatus) SetCondition(cond RollingUpgradeCondition) { @@ -115,6 +195,8 @@ type NodeReadinessGate struct { MatchLabels map[string]string `json:"matchLabels,omitempty" protobuf:"bytes,1,rep,name=matchLabels"` } +type RollingUpgradeStep string + const ( // Status StatusInit = "init" @@ -124,8 +206,32 @@ const ( // Conditions UpgradeComplete UpgradeConditionType = "Complete" + + NodeRotationTotal RollingUpgradeStep = "total" + + NodeRotationKickoff RollingUpgradeStep = "kickoff" + NodeRotationDesiredNodeReady RollingUpgradeStep = "desired_node_ready" + NodeRotationPredrainScript RollingUpgradeStep = "predrain_script" + NodeRotationDrain RollingUpgradeStep = "drain" + NodeRotationPostdrainScript RollingUpgradeStep = "postdrain_script" + NodeRotationPostWait RollingUpgradeStep = "post_wait" + NodeRotationTerminate RollingUpgradeStep = "terminate" + NodeRotationPostTerminate RollingUpgradeStep = "post_terminate" + NodeRotationCompleted RollingUpgradeStep = "completed" ) +var NodeRotationStepOrders = map[RollingUpgradeStep]int{ + NodeRotationKickoff: 10, + NodeRotationDesiredNodeReady: 20, + NodeRotationPredrainScript: 30, + NodeRotationDrain: 40, + NodeRotationPostdrainScript: 50, + NodeRotationPostWait: 60, + NodeRotationTerminate: 70, + NodeRotationPostTerminate: 80, + NodeRotationCompleted: 1000, +} + var ( FiniteStates = []string{StatusComplete, StatusError} AllowedStrategyType = []string{string(RandomUpdateStrategy), string(UniformAcrossAzUpdateStrategy)} diff --git a/api/v1alpha1/rollingupgrade_types_test.go b/api/v1alpha1/rollingupgrade_types_test.go new file mode 100644 index 00000000..5e752341 --- /dev/null +++ b/api/v1alpha1/rollingupgrade_types_test.go @@ -0,0 +1,47 @@ +package v1alpha1 + +import ( + "github.com/onsi/gomega" + "testing" +) + +// Test +func TestNodeTurnsOntoStep(t *testing.T) { + g := gomega.NewGomegaWithT(t) + + r := &RollingUpgradeStatus{} + + r.NodeStep("test-asg", "node-1", NodeRotationKickoff) + + g.Expect(r.InProcessingNodes).NotTo(gomega.BeNil()) + g.Expect(r.Statistics).To(gomega.BeNil()) + + r.NodeStep("test-asg", "node-1", NodeRotationDesiredNodeReady) + + g.Expect(r.Statistics).NotTo(gomega.BeNil()) + g.Expect(len(r.Statistics)).To(gomega.Equal(1)) + g.Expect(r.Statistics[0].StepName).To(gomega.Equal(NodeRotationKickoff)) + + //Retry desired_node_ready + r.NodeStep("test-asg", "node-1", NodeRotationDesiredNodeReady) + g.Expect(len(r.Statistics)).To(gomega.Equal(1)) + g.Expect(r.Statistics[0].StepName).To(gomega.Equal(NodeRotationKickoff)) + + //Retry desired_node_ready again + r.NodeStep("test-asg", "node-1", NodeRotationDesiredNodeReady) + g.Expect(len(r.Statistics)).To(gomega.Equal(1)) + g.Expect(r.Statistics[0].StepName).To(gomega.Equal(NodeRotationKickoff)) + + //Completed + r.NodeStep("test-asg", "node-1", NodeRotationCompleted) + g.Expect(len(r.Statistics)).To(gomega.Equal(3)) + g.Expect(r.Statistics[1].StepName).To(gomega.Equal(NodeRotationDesiredNodeReady)) + g.Expect(r.Statistics[2].StepName).To(gomega.Equal(NodeRotationTotal)) + + //Second node + r.NodeStep("test-asg", "node-2", NodeRotationKickoff) + g.Expect(len(r.Statistics)).To(gomega.Equal(3)) + + r.NodeStep("test-asg", "node-2", NodeRotationDesiredNodeReady) + g.Expect(len(r.Statistics)).To(gomega.Equal(3)) +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 5d613e18..1c76fdc4 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -24,6 +24,24 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeInProcessing) DeepCopyInto(out *NodeInProcessing) { + *out = *in + in.UpgradeStartTime.DeepCopyInto(&out.UpgradeStartTime) + in.StepStartTime.DeepCopyInto(&out.StepStartTime) + in.StepEndTime.DeepCopyInto(&out.StepEndTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeInProcessing. +func (in *NodeInProcessing) DeepCopy() *NodeInProcessing { + if in == nil { + return nil + } + out := new(NodeInProcessing) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *NodeReadinessGate) DeepCopyInto(out *NodeReadinessGate) { *out = *in @@ -191,6 +209,22 @@ func (in *RollingUpgradeSpec) DeepCopy() *RollingUpgradeSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RollingUpgradeStatistics) DeepCopyInto(out *RollingUpgradeStatistics) { + *out = *in + out.DurationSum = in.DurationSum +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RollingUpgradeStatistics. +func (in *RollingUpgradeStatistics) DeepCopy() *RollingUpgradeStatistics { + if in == nil { + return nil + } + out := new(RollingUpgradeStatistics) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RollingUpgradeStatus) DeepCopyInto(out *RollingUpgradeStatus) { *out = *in @@ -201,6 +235,32 @@ func (in *RollingUpgradeStatus) DeepCopyInto(out *RollingUpgradeStatus) { } in.LastNodeTerminationTime.DeepCopyInto(&out.LastNodeTerminationTime) in.LastNodeDrainTime.DeepCopyInto(&out.LastNodeDrainTime) + if in.Statistics != nil { + in, out := &in.Statistics, &out.Statistics + *out = make([]*RollingUpgradeStatistics, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(RollingUpgradeStatistics) + **out = **in + } + } + } + if in.InProcessingNodes != nil { + in, out := &in.InProcessingNodes, &out.InProcessingNodes + *out = make(map[string]*NodeInProcessing, len(*in)) + for key, val := range *in { + var outVal *NodeInProcessing + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = new(NodeInProcessing) + (*in).DeepCopyInto(*out) + } + (*out)[key] = outVal + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RollingUpgradeStatus. diff --git a/config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml b/config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml index f8eb03d1..615ffff6 100644 --- a/config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml +++ b/config/crd/bases/upgrademgr.keikoproj.io_rollingupgrades.yaml @@ -133,6 +133,25 @@ spec: type: string endTime: type: string + inProcessingNodes: + additionalProperties: + description: Node In-processing + properties: + nodeName: + type: string + stepEndTime: + format: date-time + type: string + stepName: + type: string + stepStartTime: + format: date-time + type: string + upgradeStartTime: + format: date-time + type: string + type: object + type: object lastDrainTime: format: date-time type: string @@ -143,6 +162,20 @@ spec: type: integer startTime: type: string + statistics: + items: + description: RollingUpgrade Statistics, includes summary(sum/count) + from each step + properties: + durationCount: + format: int32 + type: integer + durationSum: + type: string + stepName: + type: string + type: object + type: array totalNodes: type: integer totalProcessingTime: diff --git a/controllers/common/metrics.go b/controllers/common/metrics.go new file mode 100644 index 00000000..649429dc --- /dev/null +++ b/controllers/common/metrics.go @@ -0,0 +1,74 @@ +package common + +import ( + "github.com/keikoproj/upgrade-manager/controllers/common/log" + "github.com/prometheus/client_golang/prometheus" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/metrics" + "strings" + "time" +) + +//All cluster level node upgrade statistics + +var nodeRotationTotal = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "node", + Name: "rotation_total_seconds", + Help: "Node rotation total", + Buckets: []float64{ + 10.0, + 30.0, + 60.0, + 90.0, + 120.0, + 180.0, + 300.0, + 600.0, + 900.0, + }, + }) + +var stepSummaries = make(map[string]map[string]prometheus.Summary) + +func InitMetrics() { + metrics.Registry.MustRegister(nodeRotationTotal) +} + +// Add rolling update step duration when the step is completed +func AddRollingUpgradeStepDuration(asgName string, stepName string, duration time.Duration) { + if strings.EqualFold(stepName, "total") { //Histogram + nodeRotationTotal.Observe(duration.Seconds()) + } else { //Summary + var steps map[string]prometheus.Summary + if m, ok := stepSummaries[asgName]; !ok { + steps = make(map[string]prometheus.Summary) + stepSummaries[asgName] = steps + } else { + steps = m + } + + var summary prometheus.Summary + if s, ok := steps[stepName]; !ok { + summary = prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: "node", + Name: stepName + "_seconds", + Help: "Summary for node " + stepName, + ConstLabels: prometheus.Labels{"asg": asgName}, + }) + err := metrics.Registry.Register(summary) + if err != nil { + if reflect.TypeOf(err).String() == "prometheus.AlreadyRegisteredError" { + log.Warnf("summary was registered again, ASG: %s, step: %s", asgName, stepName) + } else { + log.Errorf("register summary error, ASG: %s, step: %s, %v", asgName, stepName, err) + } + } + steps[stepName] = summary + } else { + summary = s + } + summary.Observe(duration.Seconds()) + } +} diff --git a/controllers/common/metrics_test.go b/controllers/common/metrics_test.go new file mode 100644 index 00000000..296ae336 --- /dev/null +++ b/controllers/common/metrics_test.go @@ -0,0 +1,29 @@ +package common + +import ( + "github.com/onsi/gomega" + "testing" +) + +func TestAddRollingUpgradeStepDuration(t *testing.T) { + g := gomega.NewGomegaWithT(t) + + g.Expect(stepSummaries["test-asg"]).To(gomega.BeNil()) + AddRollingUpgradeStepDuration("test-asg", "kickoff", 1) + + g.Expect(stepSummaries["test-asg"]).NotTo(gomega.BeNil()) + g.Expect(stepSummaries["test-asg"]["kickoff"]).NotTo(gomega.BeNil()) + + //Test duplicate + AddRollingUpgradeStepDuration("test-asg", "kickoff", 1) + g.Expect(stepSummaries["test-asg"]["kickoff"]).NotTo(gomega.BeNil()) + + //Test duplicate + delete(stepSummaries["test-asg"], "kickoff") + AddRollingUpgradeStepDuration("test-asg", "kickoff", 1) + g.Expect(stepSummaries["test-asg"]["kickoff"]).NotTo(gomega.BeNil()) + + //Test total + AddRollingUpgradeStepDuration("test-asg", "total", 1) + g.Expect(stepSummaries["test-asg"]["kickoff"]).NotTo(gomega.BeNil()) +} diff --git a/controllers/providers/kubernetes/utils.go b/controllers/providers/kubernetes/utils.go index 79f148b8..6ea58764 100644 --- a/controllers/providers/kubernetes/utils.go +++ b/controllers/providers/kubernetes/utils.go @@ -87,10 +87,12 @@ func GetKubernetesLocalConfig() (*rest.Config, error) { } func SelectNodeByInstanceID(instanceID string, nodes *corev1.NodeList) corev1.Node { - for _, node := range nodes.Items { - nodeID := GetNodeInstanceID(node) - if strings.EqualFold(instanceID, nodeID) { - return node + if nodes != nil { + for _, node := range nodes.Items { + nodeID := GetNodeInstanceID(node) + if strings.EqualFold(instanceID, nodeID) { + return node + } } } return corev1.Node{} diff --git a/controllers/upgrade.go b/controllers/upgrade.go index 76a5d78f..f008876c 100644 --- a/controllers/upgrade.go +++ b/controllers/upgrade.go @@ -108,6 +108,9 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol } ) + //Add statistics + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationKickoff) + // Add in-progress tag if err := r.Auth.TagEC2instance(instanceID, instanceStateTagKey, inProgressTagValue); err != nil { r.Error(err, "failed to set instance tag", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) @@ -123,12 +126,18 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol } } + // Turns onto desired nodes + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDesiredNodeReady) + // Wait for desired nodes if !r.DesiredNodesReady(rollingUpgrade) { r.Info("new node is yet to join the cluster") return true, nil } + // Turns onto PreDrain script + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPredrainScript) + // Predrain script if err := r.ScriptRunner.PreDrain(scriptTarget); err != nil { return false, err @@ -137,6 +146,10 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol // Issue drain concurrently - set lastDrainTime if node := kubeprovider.SelectNodeByInstanceID(instanceID, r.Cloud.ClusterNodes); !reflect.DeepEqual(node, corev1.Node{}) { r.Info("draining the node", "name", rollingUpgrade.NamespacedName(), "instance", instanceID, "node name", node.Name) + + // Turns onto NodeRotationDrain + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationDrain) + if err := r.Auth.DrainNode(&node, time.Duration(rollingUpgrade.PostDrainDelaySeconds()), rollingUpgrade.DrainTimeout(), r.Auth.Kubernetes); err != nil { r.Error(err, "failed to drain node", "name", rollingUpgrade.NamespacedName(), "instance", instanceID, "node name", node.Name) return false, err @@ -144,16 +157,25 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol } rollingUpgrade.SetLastNodeDrainTime(metav1.Time{Time: time.Now()}) + // Turns onto NodeRotationPostdrainScript + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostdrainScript) + // post drain script if err := r.ScriptRunner.PostDrain(scriptTarget); err != nil { return false, err } + // Turns onto NodeRotationPostWait + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostWait) + // Post Wait Script if err := r.ScriptRunner.PostWait(scriptTarget); err != nil { return false, err } + // Turns onto NodeRotationTerminate + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationTerminate) + // Terminate - set lastTerminateTime r.Info("terminating instance", "name", rollingUpgrade.NamespacedName(), "instance", instanceID) if err := r.Auth.TerminateInstance(target); err != nil { @@ -162,10 +184,16 @@ func (r *RollingUpgradeReconciler) ReplaceNodeBatch(rollingUpgrade *v1alpha1.Rol } rollingUpgrade.SetLastNodeTerminationTime(metav1.Time{Time: time.Now()}) + // Turns onto NodeRotationTerminate + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationPostTerminate) + // Post Terminate Script if err := r.ScriptRunner.PostTerminate(scriptTarget); err != nil { return false, err } + + // Turns onto NodeRotationCompleted + rollingUpgrade.Status.NodeStep(rollingUpgrade.Spec.AsgName, nodeName, v1alpha1.NodeRotationCompleted) } case v1alpha1.UpdateStrategyModeLazy: for _, target := range batch { diff --git a/go.mod b/go.mod index 816421ef..b444654f 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,12 @@ require ( github.com/keikoproj/aws-sdk-go-cache v0.0.0-20201118182730-f6f418a4e2df github.com/onsi/gomega v1.10.2 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.7.1 github.com/sirupsen/logrus v1.6.0 go.uber.org/zap v1.15.0 - k8s.io/api v0.19.2 - k8s.io/apimachinery v0.19.2 - k8s.io/client-go v0.19.2 + k8s.io/api v0.20.4 + k8s.io/apimachinery v0.20.4 + k8s.io/client-go v0.20.4 + k8s.io/kubectl v0.20.4 sigs.k8s.io/controller-runtime v0.7.0 ) diff --git a/main.go b/main.go index 3a5ef2b0..c6fd6fbc 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "github.com/keikoproj/upgrade-manager/controllers/common" "os" "time" @@ -67,6 +68,8 @@ func init() { utilruntime.Must(upgrademgrv1alpha1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme + + common.InitMetrics() } func main() { @@ -212,6 +215,8 @@ func main() { os.Exit(1) } + setupLog.Info("registering prometheus") + setupLog.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager")