diff --git a/pkg/karmadactl/addons/descheduler/descheduler.go b/pkg/karmadactl/addons/descheduler/descheduler.go index cf2df0132b8c..44872de1bb5e 100644 --- a/pkg/karmadactl/addons/descheduler/descheduler.go +++ b/pkg/karmadactl/addons/descheduler/descheduler.go @@ -3,6 +3,7 @@ package descheduler import ( "context" "fmt" + "time" appsv1 "k8s.io/api/apps/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -60,7 +61,7 @@ var enableDescheduler = func(opts *addoninit.CommandAddonsEnableOption) error { return fmt.Errorf("create karmada descheduler deployment error: %v", err) } - if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaDeschedulerDeployment, opts.WaitComponentReadyTimeout); err != nil { + if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaDeschedulerDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil { return fmt.Errorf("wait karmada descheduler pod timeout: %v", err) } diff --git a/pkg/karmadactl/addons/estimator/estimator.go b/pkg/karmadactl/addons/estimator/estimator.go index f3d1f78b81fb..68d7b99ced0b 100644 --- a/pkg/karmadactl/addons/estimator/estimator.go +++ b/pkg/karmadactl/addons/estimator/estimator.go @@ -3,6 +3,7 @@ package estimator import ( "context" "fmt" + "time" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -111,7 +112,7 @@ var enableEstimator = func(opts *addoninit.CommandAddonsEnableOption) error { return fmt.Errorf("create or update scheduler estimator deployment error: %v", err) } - if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaEstimatorDeployment, opts.WaitComponentReadyTimeout); err != nil { + if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaEstimatorDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil { klog.Warning(err) } klog.Infof("Karmada scheduler estimator of member cluster %s is installed successfully.", opts.Cluster) diff --git a/pkg/karmadactl/addons/search/search.go b/pkg/karmadactl/addons/search/search.go index 580e95d9d6e8..30c3ca90bb7d 100644 --- a/pkg/karmadactl/addons/search/search.go +++ b/pkg/karmadactl/addons/search/search.go @@ -160,7 +160,7 @@ func installComponentsOnHostCluster(opts *addoninit.CommandAddonsEnableOption) e return fmt.Errorf("create karmada search deployment error: %v", err) } - if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaSearchDeployment, opts.WaitComponentReadyTimeout); err != nil { + if err := cmdutil.WaitForDeploymentRollout(opts.KubeClientSet, karmadaSearchDeployment, time.Duration(opts.WaitComponentReadyTimeout)); err != nil { return fmt.Errorf("wait karmada search pod status ready timeout: %v", err) } diff --git a/pkg/karmadactl/cmdinit/kubernetes/deploy.go b/pkg/karmadactl/cmdinit/kubernetes/deploy.go index d7234be707ad..5b31557f5791 100644 --- a/pkg/karmadactl/cmdinit/kubernetes/deploy.go +++ b/pkg/karmadactl/cmdinit/kubernetes/deploy.go @@ -351,7 +351,7 @@ func (i *CommandInitOption) createCertsSecrets() error { func (i *CommandInitOption) initKarmadaAPIServer() error { // wait karmada APIServer component ready timeout 120s - waitKarmadaAPIServerComponentReadyTimeout := 120 + waitKarmadaAPIServerComponentReadyTimeout := time.Duration(120) if err := util.CreateOrUpdateService(i.KubeClientSet, i.makeEtcdService(etcdStatefulSetAndServiceName)); err != nil { return err diff --git a/pkg/karmadactl/register/register.go b/pkg/karmadactl/register/register.go index 73ea9bc5738b..97174d56821e 100644 --- a/pkg/karmadactl/register/register.go +++ b/pkg/karmadactl/register/register.go @@ -51,7 +51,7 @@ var ( # Register cluster into karmada control plane with Pull mode. # If '--cluster-name' isn't specified, the cluster of current-context will be used by default. %[1]s register [karmada-apiserver-endpoint] --cluster-name= --token= --discovery-token-ca-cert-hash= - + # UnsafeSkipCAVerification allows token-based discovery without CA verification via CACertHashes. This can weaken # the security of register command since other clusters can impersonate the control-plane. %[1]s register [karmada-apiserver-endpoint] --token= --discovery-token-unsafe-skip-ca-verification=true @@ -357,7 +357,7 @@ func (o *CommandRegisterOption) Run(parentCommand string) error { return err } - if err := cmdutil.WaitForDeploymentRollout(o.memberClusterClient, KarmadaAgentDeployment, int(o.Timeout)); err != nil { + if err := cmdutil.WaitForDeploymentRollout(o.memberClusterClient, KarmadaAgentDeployment, o.Timeout); err != nil { return err } diff --git a/pkg/karmadactl/util/check.go b/pkg/karmadactl/util/check.go index 5152fba3e737..93285607d02c 100644 --- a/pkg/karmadactl/util/check.go +++ b/pkg/karmadactl/util/check.go @@ -6,72 +6,140 @@ import ( "time" appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +// Constants defining labels +const ( + StatusReady = "Ready" + StatusInProgress = "InProgress" + StatusUnknown = "Unknown" ) // WaitForStatefulSetRollout wait for StatefulSet reaches the ready state or timeout. -func WaitForStatefulSetRollout(c kubernetes.Interface, sts *appsv1.StatefulSet, timeoutSeconds int) error { - var lastErr error - pollError := wait.PollImmediate(time.Second, time.Duration(timeoutSeconds)*time.Second, func() (bool, error) { - s, err := c.AppsV1().StatefulSets(sts.GetNamespace()).Get(context.TODO(), sts.GetName(), metav1.GetOptions{}) - if err != nil { - lastErr = err - return false, nil - } - if s.Generation != s.Status.ObservedGeneration { - lastErr = fmt.Errorf("expected generation %d, observed generation: %d", - s.Generation, s.Status.ObservedGeneration) - return false, nil - } - if (s.Spec.Replicas != nil) && (s.Status.UpdatedReplicas < *s.Spec.Replicas) { - lastErr = fmt.Errorf("expected %d replicas, got %d updated replicas", - *s.Spec.Replicas, s.Status.UpdatedReplicas) - return false, nil - } - if s.Status.AvailableReplicas < s.Status.UpdatedReplicas { - lastErr = fmt.Errorf("expected %d replicas, got %d available replicas", - s.Status.UpdatedReplicas, s.Status.AvailableReplicas) - return false, nil +func WaitForStatefulSetRollout(c kubernetes.Interface, sts *appsv1.StatefulSet, timeoutSeconds time.Duration) error { + stsWatcher, err := c.AppsV1().StatefulSets(sts.GetNamespace()).Watch(context.Background(), metav1.SingleObject(sts.ObjectMeta)) + if err != nil { + return err + } + defer stsWatcher.Stop() + + for { + select { + case <-time.After(timeoutSeconds * time.Second): + klog.Errorf("wait for StatefulSets(%s/%s) rollout: timeout", sts.GetNamespace(), sts.GetName()) + return nil + case event, ok := <-stsWatcher.ResultChan(): + if !ok { + return fmt.Errorf("wait for Statefulset(%s/%s) rollout: channel broken", sts.GetNamespace(), sts.GetName()) + } + + status, err := stsStatus(event.Object) + if err != nil { + return err + } else if status == StatusReady { + return nil + } } - return true, nil - }) - if pollError != nil { - return fmt.Errorf("wait for Statefulset(%s/%s) rollout: %v: %v", sts.GetNamespace(), sts.GetName(), pollError, lastErr) } - return nil } // WaitForDeploymentRollout wait for Deployment reaches the ready state or timeout. -func WaitForDeploymentRollout(c kubernetes.Interface, dep *appsv1.Deployment, timeoutSeconds int) error { - var lastErr error - pollError := wait.PollImmediate(time.Second, time.Duration(timeoutSeconds)*time.Second, func() (bool, error) { - d, err := c.AppsV1().Deployments(dep.GetNamespace()).Get(context.TODO(), dep.GetName(), metav1.GetOptions{}) - if err != nil { - lastErr = err - return false, nil - } - if d.Generation != d.Status.ObservedGeneration { - lastErr = fmt.Errorf("current generation %d, observed generation %d", - d.Generation, d.Status.ObservedGeneration) - return false, nil - } - if (d.Spec.Replicas != nil) && (d.Status.UpdatedReplicas < *d.Spec.Replicas) { - lastErr = fmt.Errorf("the number of pods targeted by the deployment (%d pods) is different "+ - "from the number of pods targeted by the deployment that have the desired template spec (%d pods)", - *d.Spec.Replicas, d.Status.UpdatedReplicas) - return false, nil +func WaitForDeploymentRollout(c kubernetes.Interface, dep *appsv1.Deployment, timeoutSeconds time.Duration) error { + depWatcher, err := c.AppsV1().Deployments(dep.GetNamespace()).Watch(context.Background(), metav1.SingleObject(dep.ObjectMeta)) + if err != nil { + return err + } + defer depWatcher.Stop() + + for { + select { + case <-time.After(timeoutSeconds * time.Second): + klog.Errorf("wait for Deployment(%s/%s) rollout: timeout", dep.GetNamespace(), dep.GetName()) + return nil + case event, ok := <-depWatcher.ResultChan(): + if !ok { + return fmt.Errorf("wait for Deployment(%s/%s) rollout: channel broken", dep.GetNamespace(), dep.GetName()) + } + + status, err := deploymentStatus(event.Object) + if err != nil { + return err + } else if status == StatusReady { + return nil + } } - if d.Status.AvailableReplicas < d.Status.UpdatedReplicas { - lastErr = fmt.Errorf("expected %d replicas, got %d available replicas", - d.Status.UpdatedReplicas, d.Status.AvailableReplicas) - return false, nil + } +} + +// Reference https://github.com/kubernetes-sigs/application/blob/e5329b1b083ece8abb4b4efaf738ab9277fbb2ce/controllers/status.go#L78-#L92 +func stsStatus(o runtime.Object) (string, error) { + u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o) + if err != nil { + return StatusUnknown, err + } + + sts := &appsv1.StatefulSet{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u, sts); err != nil { + return StatusUnknown, err + } + + if sts.Status.ObservedGeneration == sts.Generation && + sts.Status.Replicas == *sts.Spec.Replicas && + sts.Status.ReadyReplicas == *sts.Spec.Replicas && + sts.Status.CurrentReplicas == *sts.Spec.Replicas { + return StatusReady, nil + } + return StatusInProgress, nil +} + +// Reference https://github.com/kubernetes-sigs/application/blob/e5329b1b083ece8abb4b4efaf738ab9277fbb2ce/controllers/status.go#L94-#L132 +// +//nolint:gocyclo +func deploymentStatus(o runtime.Object) (string, error) { + u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(o) + if err != nil { + return StatusUnknown, err + } + + deployment := &appsv1.Deployment{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u, deployment); err != nil { + return StatusUnknown, err + } + + replicaFailure := false + progressing := false + available := false + + for _, condition := range deployment.Status.Conditions { + switch condition.Type { + case appsv1.DeploymentProgressing: + if condition.Status == corev1.ConditionTrue && condition.Reason == "NewReplicaSetAvailable" { + progressing = true + } + case appsv1.DeploymentAvailable: + if condition.Status == corev1.ConditionTrue { + available = true + } + case appsv1.DeploymentReplicaFailure: + if condition.Status == corev1.ConditionTrue { + replicaFailure = true + break + } } - return true, nil - }) - if pollError != nil { - return fmt.Errorf("wait for Deployment(%s/%s) rollout: %v: %v", dep.GetNamespace(), dep.GetName(), pollError, lastErr) } - return nil + + if deployment.Status.ObservedGeneration == deployment.Generation && + deployment.Status.Replicas == *deployment.Spec.Replicas && + deployment.Status.ReadyReplicas == *deployment.Spec.Replicas && + deployment.Status.AvailableReplicas == *deployment.Spec.Replicas && + deployment.Status.Conditions != nil && len(deployment.Status.Conditions) > 0 && + (progressing || available) && !replicaFailure { + return StatusReady, nil + } + return StatusInProgress, nil }