Skip to content

Commit

Permalink
WaitForStatefulSetRollout and WaitForDeploymentRollout Use Watch and …
Browse files Browse the repository at this point in the history
…Use kubernetes-sigs/application judge status.

Signed-off-by: helen <[email protected]>
  • Loading branch information
helen committed Apr 18, 2023
1 parent ea128f9 commit 04f9a70
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 61 deletions.
3 changes: 2 additions & 1 deletion pkg/karmadactl/addons/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package descheduler
import (
"context"
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -60,7 +61,7 @@ var enableDescheduler = func(opts *addoninit.CommandAddonsEnableOption) error {
return fmt.Errorf("create karmada descheduler deployment error: %v", err)
}

if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaDeschedulerDeployment, opts.WaitComponentReadyTimeout); err != nil {
if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaDeschedulerDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil {
return fmt.Errorf("wait karmada descheduler pod timeout: %v", err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/karmadactl/addons/estimator/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package estimator
import (
"context"
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -111,7 +112,7 @@ var enableEstimator = func(opts *addoninit.CommandAddonsEnableOption) error {
return fmt.Errorf("create or update scheduler estimator deployment error: %v", err)
}

if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaEstimatorDeployment, opts.WaitComponentReadyTimeout); err != nil {
if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaEstimatorDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil {
klog.Warning(err)
}
klog.Infof("Karmada scheduler estimator of member cluster %s is installed successfully.", opts.Cluster)
Expand Down
2 changes: 1 addition & 1 deletion pkg/karmadactl/addons/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func installComponentsOnHostCluster(opts *addoninit.CommandAddonsEnableOption) e
return fmt.Errorf("create karmada search deployment error: %v", err)
}

if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaSearchDeployment, opts.WaitComponentReadyTimeout); err != nil {
if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaSearchDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil {
return fmt.Errorf("wait karmada search pod status ready timeout: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/karmadactl/cmdinit/kubernetes/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (i *CommandInitOption) createCertsSecrets() error {

func (i *CommandInitOption) initKarmadaAPIServer() error {
// wait karmada APIServer component ready timeout 120s
waitKarmadaAPIServerComponentReadyTimeout := 120
waitKarmadaAPIServerComponentReadyTimeout := time.Duration(120)

if err := util.CreateOrUpdateService(i.KubeClientSet, i.makeEtcdService(etcdStatefulSetAndServiceName)); err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/karmadactl/register/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
# Register cluster into karmada control plane with Pull mode.
# If '--cluster-name' isn't specified, the cluster of current-context will be used by default.
%[1]s register [karmada-apiserver-endpoint] --cluster-name=<CLUSTER_NAME> --token=<TOKEN> --discovery-token-ca-cert-hash=<CA-CERT-HASH>
# UnsafeSkipCAVerification allows token-based discovery without CA verification via CACertHashes. This can weaken
# the security of register command since other clusters can impersonate the control-plane.
%[1]s register [karmada-apiserver-endpoint] --token=<TOKEN> --discovery-token-unsafe-skip-ca-verification=true
Expand Down Expand Up @@ -357,7 +357,7 @@ func (o *CommandRegisterOption) Run(parentCommand string) error {
return err
}

if err := cmdutil.WaitForDeploymentRollout(o.memberClusterClient, KarmadaAgentDeployment, int(o.Timeout)); err != nil {
if err := cmdutil.WaitForDeploymentRollout(o.memberClusterClient, KarmadaAgentDeployment, o.Timeout); err != nil {
return err
}

Expand Down
173 changes: 118 additions & 55 deletions pkg/karmadactl/util/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,72 +6,135 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
)

// Constants defining labels
const (
StatusReady = "Ready"
StatusInProgress = "InProgress"
StatusUnknown = "Unknown"
)

// WaitForStatefulSetRollout wait for StatefulSet reaches the ready state or timeout.
func WaitForStatefulSetRollout(c kubernetes.Interface, sts *appsv1.StatefulSet, timeoutSeconds int) error {
var lastErr error
pollError := wait.PollImmediate(time.Second, time.Duration(timeoutSeconds)*time.Second, func() (bool, error) {
s, err := c.AppsV1().StatefulSets(sts.GetNamespace()).Get(context.TODO(), sts.GetName(), metav1.GetOptions{})
if err != nil {
lastErr = err
return false, nil
}
if s.Generation != s.Status.ObservedGeneration {
lastErr = fmt.Errorf("expected generation %d, observed generation: %d",
s.Generation, s.Status.ObservedGeneration)
return false, nil
}
if (s.Spec.Replicas != nil) && (s.Status.UpdatedReplicas < *s.Spec.Replicas) {
lastErr = fmt.Errorf("expected %d replicas, got %d updated replicas",
*s.Spec.Replicas, s.Status.UpdatedReplicas)
return false, nil
}
if s.Status.AvailableReplicas < s.Status.UpdatedReplicas {
lastErr = fmt.Errorf("expected %d replicas, got %d available replicas",
s.Status.UpdatedReplicas, s.Status.AvailableReplicas)
return false, nil
func WaitForStatefulSetRollout(c kubernetes.Interface, sts *appsv1.StatefulSet, timeoutSeconds time.Duration) error {
stsWatcher, err := c.AppsV1().StatefulSets(sts.GetNamespace()).Watch(context.Background(), metav1.SingleObject(sts.ObjectMeta))
if err != nil {
return err
}

for {
select {
case <-time.After(timeoutSeconds * time.Second):
return nil
case event, ok := <-stsWatcher.ResultChan():
if !ok {
return fmt.Errorf("wait for Statefulset(%s/%s) rollout: channel broken", sts.GetNamespace(), sts.GetName())
}

status, err := stsStatus(event.Object)
if err != nil {
return err
} else if status == StatusReady {
return nil
}
}
return true, nil
})
if pollError != nil {
return fmt.Errorf("wait for Statefulset(%s/%s) rollout: %v: %v", sts.GetNamespace(), sts.GetName(), pollError, lastErr)
}
return nil
}

// WaitForDeploymentRollout wait for Deployment reaches the ready state or timeout.
func WaitForDeploymentRollout(c kubernetes.Interface, dep *appsv1.Deployment, timeoutSeconds int) error {
var lastErr error
pollError := wait.PollImmediate(time.Second, time.Duration(timeoutSeconds)*time.Second, func() (bool, error) {
d, err := c.AppsV1().Deployments(dep.GetNamespace()).Get(context.TODO(), dep.GetName(), metav1.GetOptions{})
if err != nil {
lastErr = err
return false, nil
}
if d.Generation != d.Status.ObservedGeneration {
lastErr = fmt.Errorf("current generation %d, observed generation %d",
d.Generation, d.Status.ObservedGeneration)
return false, nil
}
if (d.Spec.Replicas != nil) && (d.Status.UpdatedReplicas < *d.Spec.Replicas) {
lastErr = fmt.Errorf("the number of pods targeted by the deployment (%d pods) is different "+
"from the number of pods targeted by the deployment that have the desired template spec (%d pods)",
*d.Spec.Replicas, d.Status.UpdatedReplicas)
return false, nil
func WaitForDeploymentRollout(c kubernetes.Interface, dep *appsv1.Deployment, timeoutSeconds time.Duration) error {
depWatcher, err := c.AppsV1().Deployments(dep.GetNamespace()).Watch(context.Background(), metav1.SingleObject(dep.ObjectMeta))
if err != nil {
return err
}

for {
select {
case <-time.After(timeoutSeconds * time.Second):
return nil
case event, ok := <-depWatcher.ResultChan():
if !ok {
return fmt.Errorf("wait for Deployment(%s/%s) rollout: channel broken", dep.GetNamespace(), dep.GetName())
}

status, err := deploymentStatus(event.Object)
if err != nil {
return err
} else if status == StatusReady {
return nil
}
}
if d.Status.AvailableReplicas < d.Status.UpdatedReplicas {
lastErr = fmt.Errorf("expected %d replicas, got %d available replicas",
d.Status.UpdatedReplicas, d.Status.AvailableReplicas)
return false, nil
}
}

// Reference https://github.com/kubernetes-sigs/application/blob/e5329b1b083ece8abb4b4efaf738ab9277fbb2ce/controllers/status.go#L78-#L92
func stsStatus(o runtime.Object) (string, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
if err != nil {
return StatusUnknown, err
}

sts := &appsv1.StatefulSet{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u, sts); err != nil {
return StatusUnknown, err
}

if sts.Status.ObservedGeneration == sts.Generation &&
sts.Status.Replicas == *sts.Spec.Replicas &&
sts.Status.ReadyReplicas == *sts.Spec.Replicas &&
sts.Status.CurrentReplicas == *sts.Spec.Replicas {
return StatusReady, nil
}
return StatusInProgress, nil
}

// Reference https://github.com/kubernetes-sigs/application/blob/e5329b1b083ece8abb4b4efaf738ab9277fbb2ce/controllers/status.go#L94-#L132
//
//nolint:gocyclo
func deploymentStatus(o runtime.Object) (string, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
if err != nil {
return StatusUnknown, err
}

deployment := &appsv1.Deployment{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u, deployment); err != nil {
return StatusUnknown, err
}

replicaFailure := false
progressing := false
available := false

for _, condition := range deployment.Status.Conditions {
switch condition.Type {
case appsv1.DeploymentProgressing:
if condition.Status == corev1.ConditionTrue && condition.Reason == "NewReplicaSetAvailable" {
progressing = true
}
case appsv1.DeploymentAvailable:
if condition.Status == corev1.ConditionTrue {
available = true
}
case appsv1.DeploymentReplicaFailure:
if condition.Status == corev1.ConditionTrue {
replicaFailure = true
break
}
}
return true, nil
})
if pollError != nil {
return fmt.Errorf("wait for Deployment(%s/%s) rollout: %v: %v", dep.GetNamespace(), dep.GetName(), pollError, lastErr)
}
return nil

if deployment.Status.ObservedGeneration == deployment.Generation &&
deployment.Status.Replicas == *deployment.Spec.Replicas &&
deployment.Status.ReadyReplicas == *deployment.Spec.Replicas &&
deployment.Status.AvailableReplicas == *deployment.Spec.Replicas &&
deployment.Status.Conditions != nil && len(deployment.Status.Conditions) > 0 &&
(progressing || available) && !replicaFailure {
return StatusReady, nil
}
return StatusInProgress, nil
}

0 comments on commit 04f9a70

Please sign in to comment.