Skip to content

Commit

Permalink
Merge pull request #1135 from hongkailiu/OTA-1411
Browse files Browse the repository at this point in the history
OTA-1411: USC: Maintain status insights for ClusterOperator resources
  • Loading branch information
openshift-merge-bot[bot] authored Jan 20, 2025
2 parents e90705b + 6b5af28 commit 8a8bca5
Show file tree
Hide file tree
Showing 4 changed files with 905 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,18 @@ rules:
- apiGroups:
- config.openshift.io
resources:
- clusteroperators
- clusterversions
verbs:
- get
- list
- watch
- apiGroups:
- apps
resources:
- deployments
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
255 changes: 233 additions & 22 deletions pkg/updatestatus/controlplaneinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package updatestatus

import (
"context"
"errors"
"fmt"
"strings"
"time"

"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/api/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
appsv1client "k8s.io/client-go/kubernetes/typed/apps/v1"
"k8s.io/klog/v2"

configv1 "github.com/openshift/api/config/v1"
Expand All @@ -20,75 +23,272 @@ import (
"github.com/openshift/cluster-version-operator/lib/resourcemerge"
)

// controlPlaneInformerController is the controller that monitors health of the control plane-related resources (initially,
// just ClusterVersion but will need to handle ClusterOperators too) and produces insights for control plane update.
// controlPlaneInformerController is the controller that monitors health of the control plane-related resources
// and produces insights for control plane update.
type controlPlaneInformerController struct {
clusterVersions configv1listers.ClusterVersionLister
recorder events.Recorder
clusterVersions configv1listers.ClusterVersionLister
clusterOperators configv1listers.ClusterOperatorLister
recorder events.Recorder

// sendInsight should be called to send produced insights to the update status controller
sendInsight sendInsightFn

appsClient appsv1client.AppsV1Interface

// now is a function that returns the current time, used for testing
now func() metav1.Time
}

func newControlPlaneInformerController(
appsClient appsv1client.AppsV1Interface,
configInformers configinformers.SharedInformerFactory,
recorder events.Recorder,
sendInsight sendInsightFn,
) factory.Controller {
cpiRecorder := recorder.WithComponentSuffix("control-plane-informer")

c := &controlPlaneInformerController{
clusterVersions: configInformers.Config().V1().ClusterVersions().Lister(),
recorder: cpiRecorder,
sendInsight: sendInsight,
clusterVersions: configInformers.Config().V1().ClusterVersions().Lister(),
clusterOperators: configInformers.Config().V1().ClusterOperators().Lister(),
recorder: cpiRecorder,
sendInsight: sendInsight,
appsClient: appsClient,

now: metav1.Now,
}

cvInformer := configInformers.Config().V1().ClusterVersions().Informer()
coInformer := configInformers.Config().V1().ClusterOperators().Informer()

controller := factory.New().
// call sync on ClusterVersion changes
WithInformersQueueKeysFunc(configApiQueueKeys, cvInformer).
// call sync on ClusterOperator changes with a filter
WithFilteredEventsInformersQueueKeysFunc(configApiQueueKeys, clusterOperatorEventFilterFunc, coInformer).
WithSync(c.sync).
ToController("ControlPlaneInformer", c.recorder)

return controller
}

func clusterOperatorEventFilterFunc(obj interface{}) bool {
co, ok := obj.(*configv1.ClusterOperator)
if ok {
for annotation := range co.Annotations {
if strings.HasPrefix(annotation, "exclude.release.openshift.io/") ||
strings.HasPrefix(annotation, "include.release.openshift.io/") {
return true
}
}
}
return false
}

const (
clusterVersionKindName = "ClusterVersion"
clusterOperatorKindName = "ClusterOperator"
)

// sync is called for any controller event. It will assess the state and health of the control plane, indicated by
// the changed resource (ClusterVersion), produce insights, and send them to the update status controller. Status
// insights are not stored between calls, so every call produces a fresh insight. This means some fields do not follow
// conventions, like LastTransitionTime in the Updating condition. Proper continuous insight maintenance will need to
// be added later (not yet sure whether on consumer or producer side).
func (c *controlPlaneInformerController) sync(_ context.Context, syncCtx factory.SyncContext) error {
func (c *controlPlaneInformerController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
queueKey := syncCtx.QueueKey()

clusterVersion, err := c.clusterVersions.Get(queueKey)
t, name, err := parseQueueKey(queueKey)
if err != nil {
if errors.IsNotFound(err) {
// TODO: Handle deletes by deleting the status insight
return nil
}
return err
return fmt.Errorf("failed to parse queue key: %w", err)
}

now := c.now()
insight := assessClusterVersion(clusterVersion, now)
msg := makeInsightMsgForClusterVersion(insight, now)
var msg informerMsg
switch t {
case clusterVersionKindName:
clusterVersion, err := c.clusterVersions.Get(name)
if err != nil {
if kerrors.IsNotFound(err) {
// TODO: Handle deletes by deleting the status insight
return nil
}
return err
}

now := c.now()
insight := assessClusterVersion(clusterVersion, now)
msg = makeInsightMsgForClusterVersion(insight, now)

case clusterOperatorKindName:
clusterVersion, err := c.clusterVersions.Get("version")
if err != nil {
return err
}
targetVersion := clusterVersion.Status.Desired.Version

clusterOperator, err := c.clusterOperators.Get(name)
if err != nil {
if kerrors.IsNotFound(err) {
// TODO: Handle deletes by deleting the status insight
return nil
}
return err
}

now := c.now()
insight, err := assessClusterOperator(ctx, clusterOperator, targetVersion, c.appsClient, now)
if err != nil {
return fmt.Errorf("failed to assess cluster operator %s: %w", name, err)
}
msg = makeInsightMsgForClusterOperator(insight, now)
default:
return fmt.Errorf("invalid queue key %s with unexpected type %s", queueKey, t)
}
var msgForLog string
if klog.V(4).Enabled() {
msgForLog = fmt.Sprintf(" | msg=%s", string(msg.insight))
}
klog.V(2).Infof("CPI :: Syncing ClusterVersion %s%s", clusterVersion.Name, msgForLog)
klog.V(2).Infof("CPI :: Syncing %s %s%s", t, name, msgForLog)
c.sendInsight(msg)

return nil
}

func makeInsightMsgForClusterOperator(coInsight *ClusterOperatorStatusInsight, acquiredAt metav1.Time) informerMsg {
uid := fmt.Sprintf("usc-co-%s", coInsight.Name)
insight := Insight{
UID: uid,
AcquiredAt: acquiredAt,
InsightUnion: InsightUnion{
Type: ClusterOperatorStatusInsightType,
ClusterOperatorStatusInsight: coInsight,
},
}
// Should handle errors, but ultimately we will have a proper API and won’t need to serialize ourselves
rawInsight, _ := yaml.Marshal(insight)
return informerMsg{
uid: uid,
insight: rawInsight,
}
}

func assessClusterOperator(ctx context.Context, operator *configv1.ClusterOperator, targetVersion string, appsClient appsv1client.AppsV1Interface, now metav1.Time) (*ClusterOperatorStatusInsight, error) {
updating := metav1.Condition{
Type: string(ClusterOperatorStatusInsightUpdating),
Status: metav1.ConditionUnknown,
Reason: string(ClusterOperatorUpdatingCannotDetermine),
LastTransitionTime: now,
}

imagePullSpec, err := getImagePullSpec(ctx, operator.Name, appsClient)
if err != nil && !errors.Is(err, operatorImageNotImplemented) {
return nil, err
}

noOperatorImageVersion := true
var operatorImageUpdated, versionUpdated bool
for _, version := range operator.Status.Versions {
if version.Name == "operator-image" {
noOperatorImageVersion = false
if imagePullSpec != "" && imagePullSpec == version.Version {
operatorImageUpdated = true
}
}
if version.Name == "operator" && version.Version == targetVersion {
versionUpdated = true
}
}

// "operator-image" might not be implemented by every cluster operator
updated := (noOperatorImageVersion || operatorImageUpdated) && versionUpdated
if updated {
updating.Status = metav1.ConditionFalse
updating.Reason = string(ClusterOperatorUpdatingReasonUpdated)
}

var available *configv1.ClusterOperatorStatusCondition
var degraded *configv1.ClusterOperatorStatusCondition
var progressing *configv1.ClusterOperatorStatusCondition

for _, condition := range operator.Status.Conditions {
condition := condition
switch {
case condition.Type == configv1.OperatorAvailable:
available = &condition
case condition.Type == configv1.OperatorDegraded:
degraded = &condition
case condition.Type == configv1.OperatorProgressing:
progressing = &condition
}
}

if !updated && progressing != nil {
if progressing.Status == configv1.ConditionTrue {
updating.Status = metav1.ConditionTrue
updating.Reason = string(ClusterOperatorUpdatingReasonProgressing)
updating.Message = progressing.Message
}
if progressing.Status == configv1.ConditionFalse {
updating.Status = metav1.ConditionFalse
updating.Reason = string(ClusterOperatorUpdatingReasonPending)
updating.Message = progressing.Message
}
}

health := metav1.Condition{
Type: string(ClusterOperatorStatusInsightHealthy),
Status: metav1.ConditionTrue,
Reason: string(ClusterOperatorHealthyReasonAsExpected),
LastTransitionTime: now,
}

if available == nil {
health.Status = metav1.ConditionUnknown
health.Reason = string(ClusterOperatorHealthyReasonUnavailable)
health.Message = "The cluster operator is unavailable because the available condition is not found in the cluster operator's status"
} else if available.Status != configv1.ConditionTrue {
health.Status = metav1.ConditionFalse
health.Reason = string(ClusterOperatorHealthyReasonUnavailable)
health.Message = available.Message
} else if degraded != nil && degraded.Status == configv1.ConditionTrue {
health.Status = metav1.ConditionFalse
health.Reason = string(ClusterOperatorHealthyReasonDegraded)
health.Message = degraded.Message
}

return &ClusterOperatorStatusInsight{
Name: operator.Name,
Resource: ResourceRef{
Resource: "clusteroperators",
Group: configv1.GroupName,
Name: operator.Name,
},
Conditions: []metav1.Condition{updating, health},
}, nil
}

var operatorImageNotImplemented = errors.New("operator-image not implemented in the versions from cluster operator's status")

func getImagePullSpec(ctx context.Context, name string, appsClient appsv1client.AppsV1Interface) (string, error) {
// It is known that the image pull spec for co/machine-config can be accessed from the deployment
if name == "machine-config" {
if appsClient == nil {
return "", errors.New("apps client is nil")
}
mcoDeployment, err := appsClient.Deployments("openshift-machine-config-operator").Get(ctx, "machine-config-operator", metav1.GetOptions{})
if err != nil {
return "", err
}
for _, c := range mcoDeployment.Spec.Template.Spec.Containers {
if c.Name == "machine-config-operator" {
return c.Image, nil
}
}
return "", errors.New("machine-config-operator container not found")
}
// We may add here retrieval of the image pull spec for other COs when they implement "operator-image" in the status.versions
return "", operatorImageNotImplemented
}

// makeInsightMsgForClusterVersion creates an informerMsg for the given ClusterVersionStatusInsight. It defines an uid
// name and serializes the insight as YAML. Serialization is convenient because it prevents any data sharing issues
// between controllers.
Expand Down Expand Up @@ -270,16 +470,27 @@ func versionsFromHistory(history []configv1.UpdateHistory) ControlPlaneUpdateVer
return versions
}

func parseQueueKey(queueKey string) (string, string, error) {
splits := strings.Split(queueKey, "/")
if len(splits) != 2 {
return "", "", fmt.Errorf("invalid queue key: %s", queueKey)
}
return splits[0], splits[1], nil
}

func configApiQueueKeys(object runtime.Object) []string {
if object == nil {
return nil
}

switch o := object.(type) {
case *configv1.ClusterVersion:
return []string{o.Name}
return []string{fmt.Sprintf("%s/%s", clusterVersionKindName, o.Name)}
case *configv1.ClusterOperator:
return []string{fmt.Sprintf("%s/%s", clusterOperatorKindName, o.Name)}
}

klog.Fatalf("USC :: Unknown object type: %T", object)
return nil
msg := fmt.Sprintf("USC :: Unknown object type: %T", object)
klog.Error(msg)
panic(msg)
}
Loading

0 comments on commit 8a8bca5

Please sign in to comment.