Skip to content

Commit

Permalink
Feat: Machine provider support OnHealthCheck
Browse files Browse the repository at this point in the history
Signed-off-by: malc0lm <[email protected]>
  • Loading branch information
malc0lm committed Mar 22, 2022
1 parent 0174617 commit 9b2e9ec
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 48 deletions.
47 changes: 2 additions & 45 deletions pkg/platform/controller/machine/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,11 @@ import (
"tkestack.io/tke/pkg/platform/controller/machine/deletion"
clusterprovider "tkestack.io/tke/pkg/platform/provider/cluster"
machineprovider "tkestack.io/tke/pkg/platform/provider/machine"
"tkestack.io/tke/pkg/platform/util"
"tkestack.io/tke/pkg/util/apiclient"
"tkestack.io/tke/pkg/util/log"
"tkestack.io/tke/pkg/util/metrics"
)

const (
conditionTypeHealthCheck = "HealthCheck"
failedHealthCheckReason = "FailedHealthCheck"

resyncInternal = 1 * time.Minute
)

Expand Down Expand Up @@ -124,7 +119,7 @@ func (c *Controller) updateMachine(old, obj interface{}) {
}

func (c *Controller) needsUpdate(old *platformv1.Machine, new *platformv1.Machine) bool {
healthCondition := new.GetCondition(conditionTypeHealthCheck)
healthCondition := new.GetCondition(machineprovider.ConditionTypeHealthCheck)
if !reflect.DeepEqual(old.Spec, new.Spec) {
return true

Expand Down Expand Up @@ -312,7 +307,7 @@ func (c *Controller) onUpdate(ctx context.Context, machine *platformv1.Machine)
}

err = provider.OnUpdate(ctx, machine, cluster)
machine = c.checkHealth(ctx, machine)
machine = provider.OnHealthCheck(ctx, machine, cluster)
if err != nil {
// Update status, ignore failure
_, _ = c.platformClient.Machines().UpdateStatus(ctx, machine, metav1.UpdateOptions{})
Expand All @@ -325,41 +320,3 @@ func (c *Controller) onUpdate(ctx context.Context, machine *platformv1.Machine)

return nil
}

func (c *Controller) checkHealth(ctx context.Context, machine *platformv1.Machine) *platformv1.Machine {
if !(machine.Status.Phase == platformv1.MachineRunning ||
machine.Status.Phase == platformv1.MachineFailed) {
return machine
}

healthCheckCondition := platformv1.MachineCondition{
Type: conditionTypeHealthCheck,
Status: platformv1.ConditionFalse,
}

clientset, err := util.BuildExternalClientSetWithName(ctx, c.platformClient, machine.Spec.ClusterName)
if err != nil {
machine.Status.Phase = platformv1.MachineFailed

healthCheckCondition.Reason = failedHealthCheckReason
healthCheckCondition.Message = err.Error()
} else {
_, err = apiclient.GetNodeByMachineIP(ctx, clientset, machine.Spec.IP)
if err != nil {
machine.Status.Phase = platformv1.MachineFailed

healthCheckCondition.Reason = failedHealthCheckReason
healthCheckCondition.Message = err.Error()
} else {
machine.Status.Phase = platformv1.MachineRunning

healthCheckCondition.Status = platformv1.ConditionTrue
}
}

machine.SetCondition(healthCheckCondition)

log.FromContext(ctx).Info("Update machine health status", "phase", machine.Status.Phase)

return machine
}
7 changes: 4 additions & 3 deletions pkg/platform/controller/machine/machine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
platformv1 "tkestack.io/tke/api/platform/v1"
machineprovider "tkestack.io/tke/pkg/platform/provider/machine"
)

func newMachineForTest(resourcesVersion string, spec *platformv1.MachineSpec, phase platformv1.MachinePhase, conditions []platformv1.MachineCondition) *platformv1.Machine {
Expand All @@ -41,7 +42,7 @@ func newMachineForTest(resourcesVersion string, spec *platformv1.MachineSpec, ph
Phase: platformv1.MachineRunning,
Conditions: []platformv1.MachineCondition{
{
Type: conditionTypeHealthCheck,
Type: machineprovider.ConditionTypeHealthCheck,
Status: platformv1.ConditionTrue,
LastProbeTime: v1.Now(),
},
Expand Down Expand Up @@ -180,7 +181,7 @@ func TestController_needsUpdate(t *testing.T) {
name: "health check is not long enough",
args: func() args {
new := newMachineForTest("new", nil, platformv1.MachinePhase(""), []platformv1.MachineCondition{{
Type: conditionTypeHealthCheck,
Type: machineprovider.ConditionTypeHealthCheck,
Status: platformv1.ConditionTrue,
LastProbeTime: v1.NewTime(time.Now().Add(-resyncInternal / 2))}})
return args{new, new}
Expand All @@ -191,7 +192,7 @@ func TestController_needsUpdate(t *testing.T) {
name: "health check is long enough",
args: func() args {
new := newMachineForTest("new", nil, platformv1.MachinePhase(""), []platformv1.MachineCondition{{
Type: conditionTypeHealthCheck,
Type: machineprovider.ConditionTypeHealthCheck,
Status: platformv1.ConditionTrue,
LastProbeTime: v1.NewTime(time.Now().Add(-resyncInternal - 1))}})
return args{new, new}
Expand Down
46 changes: 46 additions & 0 deletions pkg/platform/provider/machine/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"strings"
"time"

"tkestack.io/tke/pkg/util/apiclient"
"tkestack.io/tke/pkg/util/log"

"github.com/thoas/go-funk"
"k8s.io/apimachinery/pkg/util/validation/field"
"tkestack.io/tke/api/platform"

platformv1 "tkestack.io/tke/api/platform/v1"
typesv1 "tkestack.io/tke/pkg/platform/types/v1"
)
Expand All @@ -44,6 +46,9 @@ const (
ReasonFailedDelete = "FailedDelete"

ConditionTypeDone = "EnsureDone"

ConditionTypeHealthCheck = "HealthCheck"
FailedHealthCheckReason = "FailedHealthCheck"
)

type APIProvider interface {
Expand All @@ -62,6 +67,9 @@ type ControllerProvider interface {
OnCreate(ctx context.Context, machine *platformv1.Machine, cluster *typesv1.Cluster) error
OnUpdate(ctx context.Context, machine *platformv1.Machine, cluster *typesv1.Cluster) error
OnDelete(ctx context.Context, machine *platformv1.Machine, cluster *typesv1.Cluster) error
// OnHealthCheck could be implemented by user, and default implementation is checking
// tenant cluster node status by machine IP
OnHealthCheck(ctx context.Context, machine *platformv1.Machine, cluster *typesv1.Cluster) *platformv1.Machine
}

// Provider defines a set of response interfaces for specific machine
Expand Down Expand Up @@ -234,6 +242,44 @@ func (p *DelegateProvider) OnDelete(ctx context.Context, machine *platformv1.Mac
return nil
}

func (p *DelegateProvider) OnHealthCheck(ctx context.Context, machine *platformv1.Machine, cluster *typesv1.Cluster) *platformv1.Machine {
if !(machine.Status.Phase == platformv1.MachineRunning ||
machine.Status.Phase == platformv1.MachineFailed) {
return machine
}

healthCheckCondition := platformv1.MachineCondition{
Type: ConditionTypeHealthCheck,
Status: platformv1.ConditionFalse,
}

clientset, err := cluster.Clientset()
if err != nil {
machine.Status.Phase = platformv1.MachineFailed

healthCheckCondition.Reason = FailedHealthCheckReason
healthCheckCondition.Message = err.Error()
} else {
_, err = apiclient.GetNodeByMachineIP(ctx, clientset, machine.Spec.IP)
if err != nil {
machine.Status.Phase = platformv1.MachineFailed

healthCheckCondition.Reason = FailedHealthCheckReason
healthCheckCondition.Message = err.Error()
} else {
machine.Status.Phase = platformv1.MachineRunning

healthCheckCondition.Status = platformv1.ConditionTrue
}
}

machine.SetCondition(healthCheckCondition)

log.FromContext(ctx).Info("Update machine health status", "phase", machine.Status.Phase)

return machine
}

func (p *DelegateProvider) NeedUpdate(old, new *platformv1.Machine) bool {
return false
}
Expand Down

0 comments on commit 9b2e9ec

Please sign in to comment.