Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization WaitForStatefulSetRollout and WaitForDeploymentRollout #3418

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/karmadactl/addons/descheduler/descheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package descheduler
import (
"context"
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -60,7 +61,7 @@ var enableDescheduler = func(opts *addoninit.CommandAddonsEnableOption) error {
return fmt.Errorf("create karmada descheduler deployment error: %v", err)
}

if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaDeschedulerDeployment, opts.WaitComponentReadyTimeout); err != nil {
if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaDeschedulerDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil {
return fmt.Errorf("wait karmada descheduler pod timeout: %v", err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/karmadactl/addons/estimator/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package estimator
import (
"context"
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -111,7 +112,7 @@ var enableEstimator = func(opts *addoninit.CommandAddonsEnableOption) error {
return fmt.Errorf("create or update scheduler estimator deployment error: %v", err)
}

if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaEstimatorDeployment, opts.WaitComponentReadyTimeout); err != nil {
if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaEstimatorDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil {
klog.Warning(err)
}
klog.Infof("Karmada scheduler estimator of member cluster %s is installed successfully.", opts.Cluster)
Expand Down
2 changes: 1 addition & 1 deletion pkg/karmadactl/addons/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func installComponentsOnHostCluster(opts *addoninit.CommandAddonsEnableOption) e
return fmt.Errorf("create karmada search deployment error: %v", err)
}

if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaSearchDeployment, opts.WaitComponentReadyTimeout); err != nil {
if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaSearchDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil {
return fmt.Errorf("wait karmada search pod status ready timeout: %v", err)
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/karmadactl/cmdinit/kubernetes/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (i *CommandInitOption) Validate(parentCommand string) error {
return fmt.Errorf("when etcd storage mode is PVC, storageClassesName is not empty. See '%s init --help'", parentCommand)
}

if i.WaitComponentReadyTimeout < 0 {
if time.Duration(i.WaitComponentReadyTimeout) < 0 {
return fmt.Errorf("wait-component-ready-timeout must be greater than or equal to 0")
}

Expand Down Expand Up @@ -365,7 +365,7 @@ func (i *CommandInitOption) initKarmadaAPIServer() error {
if _, err := i.KubeClientSet.AppsV1().StatefulSets(i.Namespace).Create(context.TODO(), etcdStatefulSet, metav1.CreateOptions{}); err != nil {
klog.Warning(err)
}
if err := util.WaitForStatefulSetRollout(i.KubeClientSet, etcdStatefulSet, i.WaitComponentReadyTimeout); err != nil {
if err := util.WaitForStatefulSetRollout(i.KubeClientSet, etcdStatefulSet, time.Duration(i.WaitComponentReadyTimeout)); err != nil {
klog.Warning(err)
}
klog.Info("Create karmada ApiServer Deployment")
Expand All @@ -377,7 +377,7 @@ func (i *CommandInitOption) initKarmadaAPIServer() error {
if _, err := i.KubeClientSet.AppsV1().Deployments(i.Namespace).Create(context.TODO(), karmadaAPIServerDeployment, metav1.CreateOptions{}); err != nil {
klog.Warning(err)
}
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaAPIServerDeployment, i.WaitComponentReadyTimeout); err != nil {
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaAPIServerDeployment, time.Duration(i.WaitComponentReadyTimeout)); err != nil {
return err
}

Expand All @@ -391,7 +391,7 @@ func (i *CommandInitOption) initKarmadaAPIServer() error {
if _, err := i.KubeClientSet.AppsV1().Deployments(i.Namespace).Create(context.TODO(), i.makeKarmadaAggregatedAPIServerDeployment(), metav1.CreateOptions{}); err != nil {
klog.Warning(err)
}
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaAggregatedAPIServerDeployment, i.WaitComponentReadyTimeout); err != nil {
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaAggregatedAPIServerDeployment, time.Duration(i.WaitComponentReadyTimeout)); err != nil {
klog.Warning(err)
}
return nil
Expand All @@ -409,7 +409,7 @@ func (i *CommandInitOption) initKarmadaComponent() error {
if _, err := deploymentClient.Create(context.TODO(), karmadaKubeControllerManagerDeployment, metav1.CreateOptions{}); err != nil {
klog.Warning(err)
}
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaKubeControllerManagerDeployment, i.WaitComponentReadyTimeout); err != nil {
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaKubeControllerManagerDeployment, time.Duration(i.WaitComponentReadyTimeout)); err != nil {
klog.Warning(err)
}

Expand All @@ -420,7 +420,7 @@ func (i *CommandInitOption) initKarmadaComponent() error {
if _, err := deploymentClient.Create(context.TODO(), karmadaSchedulerDeployment, metav1.CreateOptions{}); err != nil {
klog.Warning(err)
}
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaSchedulerDeployment, i.WaitComponentReadyTimeout); err != nil {
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaSchedulerDeployment, time.Duration(i.WaitComponentReadyTimeout)); err != nil {
klog.Warning(err)
}

Expand All @@ -431,7 +431,7 @@ func (i *CommandInitOption) initKarmadaComponent() error {
if _, err := deploymentClient.Create(context.TODO(), karmadaControllerManagerDeployment, metav1.CreateOptions{}); err != nil {
klog.Warning(err)
}
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaControllerManagerDeployment, i.WaitComponentReadyTimeout); err != nil {
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaControllerManagerDeployment, time.Duration(i.WaitComponentReadyTimeout)); err != nil {
klog.Warning(err)
}

Expand All @@ -445,7 +445,7 @@ func (i *CommandInitOption) initKarmadaComponent() error {
if _, err := deploymentClient.Create(context.TODO(), karmadaWebhookDeployment, metav1.CreateOptions{}); err != nil {
klog.Warning(err)
}
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaWebhookDeployment, i.WaitComponentReadyTimeout); err != nil {
if err := util.WaitForDeploymentRollout(i.KubeClientSet, karmadaWebhookDeployment, time.Duration(i.WaitComponentReadyTimeout)); err != nil {
klog.Warning(err)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/karmadactl/register/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
# Register cluster into karmada control plane with Pull mode.
# If '--cluster-name' isn't specified, the cluster of current-context will be used by default.
%[1]s register [karmada-apiserver-endpoint] --cluster-name=<CLUSTER_NAME> --token=<TOKEN> --discovery-token-ca-cert-hash=<CA-CERT-HASH>

# UnsafeSkipCAVerification allows token-based discovery without CA verification via CACertHashes. This can weaken
# the security of register command since other clusters can impersonate the control-plane.
%[1]s register [karmada-apiserver-endpoint] --token=<TOKEN> --discovery-token-unsafe-skip-ca-verification=true
Expand Down Expand Up @@ -357,7 +357,7 @@ func (o *CommandRegisterOption) Run(parentCommand string) error {
return err
}

if err := cmdutil.WaitForDeploymentRollout(o.memberClusterClient, KarmadaAgentDeployment, int(o.Timeout)); err != nil {
if err := cmdutil.WaitForDeploymentRollout(o.memberClusterClient, KarmadaAgentDeployment, o.Timeout); err != nil {
return err
}

Expand Down
178 changes: 123 additions & 55 deletions pkg/karmadactl/util/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,72 +6,140 @@ import (
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

// Constants defining labels
const (
StatusReady = "Ready"
StatusInProgress = "InProgress"
StatusUnknown = "Unknown"
)

// WaitForStatefulSetRollout wait for StatefulSet reaches the ready state or timeout.
func WaitForStatefulSetRollout(c kubernetes.Interface, sts *appsv1.StatefulSet, timeoutSeconds int) error {
var lastErr error
pollError := wait.PollImmediate(time.Second, time.Duration(timeoutSeconds)*time.Second, func() (bool, error) {
s, err := c.AppsV1().StatefulSets(sts.GetNamespace()).Get(context.TODO(), sts.GetName(), metav1.GetOptions{})
if err != nil {
lastErr = err
return false, nil
}
if s.Generation != s.Status.ObservedGeneration {
lastErr = fmt.Errorf("expected generation %d, observed generation: %d",
s.Generation, s.Status.ObservedGeneration)
return false, nil
}
if (s.Spec.Replicas != nil) && (s.Status.UpdatedReplicas < *s.Spec.Replicas) {
lastErr = fmt.Errorf("expected %d replicas, got %d updated replicas",
*s.Spec.Replicas, s.Status.UpdatedReplicas)
return false, nil
}
if s.Status.AvailableReplicas < s.Status.UpdatedReplicas {
lastErr = fmt.Errorf("expected %d replicas, got %d available replicas",
s.Status.UpdatedReplicas, s.Status.AvailableReplicas)
return false, nil
func WaitForStatefulSetRollout(c kubernetes.Interface, sts *appsv1.StatefulSet, timeoutSeconds time.Duration) error {
stsWatcher, err := c.AppsV1().StatefulSets(sts.GetNamespace()).Watch(context.Background(), metav1.SingleObject(sts.ObjectMeta))
helen-frank marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
defer stsWatcher.Stop()

for {
select {
case <-time.After(timeoutSeconds * time.Second):
klog.Errorf("wait for StatefulSets(%s/%s) rollout: timeout", sts.GetNamespace(), sts.GetName())
return nil
case event, ok := <-stsWatcher.ResultChan():
if !ok {
return fmt.Errorf("wait for Statefulset(%s/%s) rollout: channel broken", sts.GetNamespace(), sts.GetName())
}

status, err := stsStatus(event.Object)
if err != nil {
return err
} else if status == StatusReady {
return nil
}
}
return true, nil
})
if pollError != nil {
return fmt.Errorf("wait for Statefulset(%s/%s) rollout: %v: %v", sts.GetNamespace(), sts.GetName(), pollError, lastErr)
}
return nil
}

// WaitForDeploymentRollout wait for Deployment reaches the ready state or timeout.
func WaitForDeploymentRollout(c kubernetes.Interface, dep *appsv1.Deployment, timeoutSeconds int) error {
var lastErr error
pollError := wait.PollImmediate(time.Second, time.Duration(timeoutSeconds)*time.Second, func() (bool, error) {
d, err := c.AppsV1().Deployments(dep.GetNamespace()).Get(context.TODO(), dep.GetName(), metav1.GetOptions{})
if err != nil {
lastErr = err
return false, nil
}
if d.Generation != d.Status.ObservedGeneration {
lastErr = fmt.Errorf("current generation %d, observed generation %d",
d.Generation, d.Status.ObservedGeneration)
return false, nil
}
if (d.Spec.Replicas != nil) && (d.Status.UpdatedReplicas < *d.Spec.Replicas) {
lastErr = fmt.Errorf("the number of pods targeted by the deployment (%d pods) is different "+
"from the number of pods targeted by the deployment that have the desired template spec (%d pods)",
*d.Spec.Replicas, d.Status.UpdatedReplicas)
return false, nil
func WaitForDeploymentRollout(c kubernetes.Interface, dep *appsv1.Deployment, timeoutSeconds time.Duration) error {
depWatcher, err := c.AppsV1().Deployments(dep.GetNamespace()).Watch(context.Background(), metav1.SingleObject(dep.ObjectMeta))
helen-frank marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
defer depWatcher.Stop()

for {
select {
case <-time.After(timeoutSeconds * time.Second):
klog.Errorf("wait for Deployment(%s/%s) rollout: timeout", dep.GetNamespace(), dep.GetName())
return nil
case event, ok := <-depWatcher.ResultChan():
if !ok {
return fmt.Errorf("wait for Deployment(%s/%s) rollout: channel broken", dep.GetNamespace(), dep.GetName())
}

status, err := deploymentStatus(event.Object)
if err != nil {
return err
} else if status == StatusReady {
return nil
}
}
if d.Status.AvailableReplicas < d.Status.UpdatedReplicas {
lastErr = fmt.Errorf("expected %d replicas, got %d available replicas",
d.Status.UpdatedReplicas, d.Status.AvailableReplicas)
return false, nil
}
}

// Reference https://github.com/kubernetes-sigs/application/blob/e5329b1b083ece8abb4b4efaf738ab9277fbb2ce/controllers/status.go#L78-#L92
func stsStatus(o runtime.Object) (string, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
if err != nil {
return StatusUnknown, err
}

sts := &appsv1.StatefulSet{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u, sts); err != nil {
return StatusUnknown, err
}

if sts.Status.ObservedGeneration == sts.Generation &&
sts.Status.Replicas == *sts.Spec.Replicas &&
sts.Status.ReadyReplicas == *sts.Spec.Replicas &&
sts.Status.CurrentReplicas == *sts.Spec.Replicas {
helen-frank marked this conversation as resolved.
Show resolved Hide resolved
return StatusReady, nil
}
return StatusInProgress, nil
}

// Reference https://github.com/kubernetes-sigs/application/blob/e5329b1b083ece8abb4b4efaf738ab9277fbb2ce/controllers/status.go#L94-#L132
//
//nolint:gocyclo
helen-frank marked this conversation as resolved.
Show resolved Hide resolved
func deploymentStatus(o runtime.Object) (string, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o)
if err != nil {
return StatusUnknown, err
}

deployment := &appsv1.Deployment{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u, deployment); err != nil {
return StatusUnknown, err
}

replicaFailure := false
progressing := false
available := false

for _, condition := range deployment.Status.Conditions {
switch condition.Type {
case appsv1.DeploymentProgressing:
if condition.Status == corev1.ConditionTrue && condition.Reason == "NewReplicaSetAvailable" {
progressing = true
}
case appsv1.DeploymentAvailable:
if condition.Status == corev1.ConditionTrue {
available = true
}
case appsv1.DeploymentReplicaFailure:
if condition.Status == corev1.ConditionTrue {
replicaFailure = true
break
}
}
return true, nil
})
if pollError != nil {
return fmt.Errorf("wait for Deployment(%s/%s) rollout: %v: %v", dep.GetNamespace(), dep.GetName(), pollError, lastErr)
}
return nil

if deployment.Status.ObservedGeneration == deployment.Generation &&
deployment.Status.Replicas == *deployment.Spec.Replicas &&
deployment.Status.ReadyReplicas == *deployment.Spec.Replicas &&
deployment.Status.AvailableReplicas == *deployment.Spec.Replicas &&
deployment.Status.Conditions != nil && len(deployment.Status.Conditions) > 0 &&
(progressing || available) && !replicaFailure {
return StatusReady, nil
}
return StatusInProgress, nil
}