Skip to content

Commit

Permalink
addon consume rollout helpers
Browse files Browse the repository at this point in the history
Signed-off-by: haoqing0110 <[email protected]>
  • Loading branch information
haoqing0110 committed Jul 20, 2023
1 parent d52ab94 commit c2b4cda
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 118 deletions.
72 changes: 41 additions & 31 deletions pkg/addon/controllers/addonconfiguration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"open-cluster-management.io/addon-framework/pkg/index"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned"
addoninformerv1alpha1 "open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1"
addonlisterv1alpha1 "open-cluster-management.io/api/client/addon/listers/addon/v1alpha1"
clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterinformersv1beta1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta1"
clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
Expand All @@ -24,12 +27,29 @@ import (
"open-cluster-management.io/ocm/pkg/common/queue"
)

type placementDecisionGetter struct {
clusterClient *clusterclientset.Clientset
}

func (pdl placementDecisionGetter) List(selector labels.Selector, namespace string) ([]*clusterv1beta1.PlacementDecision, error) {
decisionList, err := pdl.clusterClient.ClusterV1beta1().PlacementDecisions(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return nil, err
}
var decisions []*clusterv1beta1.PlacementDecision
for i := range decisionList.Items {
decisions = append(decisions, &decisionList.Items[i])
}
return decisions, nil
}

// addonConfigurationController is a controller to update configuration of mca with the following order
// 1. use configuration in mca spec if it is set
// 2. use configuration in install strategy
// 3. use configuration in the default configuration in cma
type addonConfigurationController struct {
addonClient addonv1alpha1client.Interface
clusterClient *clusterclientset.Clientset
clusterManagementAddonLister addonlisterv1alpha1.ClusterManagementAddOnLister
managedClusterAddonIndexer cache.Indexer
addonFilterFunc factory.EventFilterFunc
Expand All @@ -53,6 +73,7 @@ const (

func NewAddonConfigurationController(
addonClient addonv1alpha1client.Interface,
clusterClient *clusterclientset.Clientset,
addonInformers addoninformerv1alpha1.ManagedClusterAddOnInformer,
clusterManagementAddonInformers addoninformerv1alpha1.ClusterManagementAddOnInformer,
placementInformer clusterinformersv1beta1.PlacementInformer,
Expand All @@ -62,6 +83,7 @@ func NewAddonConfigurationController(
) factory.Controller {
c := &addonConfigurationController{
addonClient: addonClient,
clusterClient: clusterClient,
clusterManagementAddonLister: clusterManagementAddonInformers.Lister(),
managedClusterAddonIndexer: addonInformers.Informer().GetIndexer(),
placementLister: placementInformer.Lister(),
Expand Down Expand Up @@ -156,50 +178,38 @@ func (c *addonConfigurationController) buildConfigurationGraph(cma *addonv1alpha
// check each install strategy in status
var errs []error
for _, installProgression := range cma.Status.InstallProgressions {
clusters, err := c.getClustersByPlacement(installProgression.PlacementRef.Name, installProgression.PlacementRef.Namespace)
if errors.IsNotFound(err) {
klog.V(2).Infof("placement %s/%s is not found for addon %s", installProgression.PlacementRef.Namespace, installProgression.PlacementRef.Name, cma.Name)
continue
}
if err != nil {
errs = append(errs, err)
continue
}

for _, installStrategy := range cma.Spec.InstallStrategy.Placements {
if installStrategy.PlacementRef == installProgression.PlacementRef {
graph.addPlacementNode(installStrategy, installProgression, clusters)
if installStrategy.PlacementRef != installProgression.PlacementRef {
continue
}

pdTracker, placement, err := c.getClustersPlacementDecisionClustersTracker(installProgression.PlacementRef.Name, installProgression.PlacementRef.Namespace)
if errors.IsNotFound(err) {
klog.V(2).Infof("placement %s/%s is not found for addon %s", installProgression.PlacementRef.Namespace, installProgression.PlacementRef.Name, cma.Name)
continue
}
if err != nil {
errs = append(errs, err)
continue
}
graph.addPlacementNode(installStrategy, installProgression, pdTracker, placement)
}
}

return graph, utilerrors.NewAggregate(errs)
}

func (c *addonConfigurationController) getClustersByPlacement(name, namespace string) ([]string, error) {
var clusters []string
func (c *addonConfigurationController) getClustersPlacementDecisionClustersTracker(name, namespace string) (*clusterv1beta1.PlacementDecisionClustersTracker, *clusterv1beta1.Placement, error) {
if c.placementLister == nil || c.placementDecisionLister == nil {
return clusters, nil
}
_, err := c.placementLister.Placements(namespace).Get(name)
if err != nil {
return clusters, err
return nil, nil, nil
}

decisionSelector := labels.SelectorFromSet(labels.Set{
clusterv1beta1.PlacementLabel: name,
})
decisions, err := c.placementDecisionLister.PlacementDecisions(namespace).List(decisionSelector)
placement, err := c.placementLister.Placements(namespace).Get(name)
if err != nil {
return clusters, err
}

for _, d := range decisions {
for _, sd := range d.Status.Decisions {
clusters = append(clusters, sd.ClusterName)
}
return nil, nil, err
}

return clusters, nil
pdtracker := clusterv1beta1.NewPlacementDecisionClustersTracker(placement, placementDecisionGetter{clusterClient: c.clusterClient}, sets.Set[string]{})
pdtracker.Get()
return pdtracker, placement, nil
}
136 changes: 50 additions & 86 deletions pkg/addon/controllers/addonconfiguration/graph.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package addonconfiguration

import (
"fmt"
"math"
"sort"
"strconv"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"

addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
)

var (
Expand All @@ -30,9 +27,10 @@ type configurationGraph struct {

// installStrategyNode is a node in configurationGraph defined by a install strategy
type installStrategyNode struct {
placementRef addonv1alpha1.PlacementRef
maxConcurrency intstr.IntOrString
desiredConfigs addonConfigMap
placementRef addonv1alpha1.PlacementRef
pdTracker *clusterv1beta1.PlacementDecisionClustersTracker
rolloutStrategy clusterv1alpha1.RolloutStrategy
desiredConfigs addonConfigMap
// children keeps a map of addons node as the children of this node
children map[string]*addonNode
clusters sets.Set[string]
Expand All @@ -44,45 +42,44 @@ type addonNode struct {
desiredConfigs addonConfigMap
mca *addonv1alpha1.ManagedClusterAddOn
// record mca upgrade status
mcaUpgradeStatus upgradeStatus
mcaUpgradeStatus clusterv1alpha1.RolloutStatus
}

type upgradeStatus int

const (
// mca desired configs not synced from desiredConfigs yet
toupgrade upgradeStatus = iota
// mca desired configs upgraded and last applied configs not upgraded
upgrading
// both desired configs and last applied configs are upgraded
upgraded
)

type addonConfigMap map[addonv1alpha1.ConfigGroupResource]addonv1alpha1.ConfigReference

// set addon upgrade status
func (n *addonNode) setUpgradeStatus() {
if len(n.mca.Status.ConfigReferences) != len(n.desiredConfigs) {
n.mcaUpgradeStatus = toupgrade
n.mcaUpgradeStatus = clusterv1alpha1.ToUpgrade
return
}

upgradeSucceed := true
for _, cond := range n.mca.Status.Conditions {
if cond.Type == addonv1alpha1.ManagedClusterAddOnConditionProgressing && cond.Reason == addonv1alpha1.ProgressingReasonUpgradeFailed {
upgradeSucceed = false
}
}

for _, actual := range n.mca.Status.ConfigReferences {
if desired, ok := n.desiredConfigs[actual.ConfigGroupResource]; ok {
if !equality.Semantic.DeepEqual(desired.DesiredConfig, actual.DesiredConfig) {
n.mcaUpgradeStatus = toupgrade
n.mcaUpgradeStatus = clusterv1alpha1.ToUpgrade
return
} else if !equality.Semantic.DeepEqual(actual.LastAppliedConfig, actual.DesiredConfig) {
n.mcaUpgradeStatus = upgrading
if !upgradeSucceed {
n.mcaUpgradeStatus = clusterv1alpha1.UpgradeFailed
} else {
n.mcaUpgradeStatus = clusterv1alpha1.Upgrading
}
return
}
} else {
n.mcaUpgradeStatus = toupgrade
n.mcaUpgradeStatus = clusterv1alpha1.ToUpgrade
return
}
}

n.mcaUpgradeStatus = upgraded
n.mcaUpgradeStatus = clusterv1alpha1.UpgradeSucceed
}

func (d addonConfigMap) copy() addonConfigMap {
Expand All @@ -97,7 +94,6 @@ func newGraph(supportedConfigs []addonv1alpha1.ConfigMeta, defaultConfigReferenc
graph := &configurationGraph{
nodes: []*installStrategyNode{},
defaults: &installStrategyNode{
maxConcurrency: maxMaxConcurrency,
desiredConfigs: map[addonv1alpha1.ConfigGroupResource]addonv1alpha1.ConfigReference{},
children: map[string]*addonNode{},
},
Expand Down Expand Up @@ -145,25 +141,29 @@ func (g *configurationGraph) addAddonNode(mca *addonv1alpha1.ManagedClusterAddOn
func (g *configurationGraph) addPlacementNode(
installStrategy addonv1alpha1.PlacementStrategy,
installProgression addonv1alpha1.InstallProgression,
clusters []string,
pdTracker *clusterv1beta1.PlacementDecisionClustersTracker,
placement *clusterv1beta1.Placement,
) {
placementRef := installProgression.PlacementRef
installConfigReference := installProgression.ConfigReferences

node := &installStrategyNode{
placementRef: placementRef,
maxConcurrency: maxMaxConcurrency,
desiredConfigs: g.defaults.desiredConfigs,
children: map[string]*addonNode{},
clusters: sets.New[string](clusters...),
placementRef: placementRef,
pdTracker: pdTracker,
rolloutStrategy: installStrategy.RolloutStrategy,
desiredConfigs: g.defaults.desiredConfigs,
children: map[string]*addonNode{},
clusters: pdTracker.Existing([]clusterv1beta1.GroupKey{}),
}

// set max concurrency
if installStrategy.RolloutStrategy.Type == addonv1alpha1.AddonRolloutStrategyRollingUpdate {
if installStrategy.RolloutStrategy.RollingUpdate != nil {
node.maxConcurrency = installStrategy.RolloutStrategy.RollingUpdate.MaxConcurrency
} else {
node.maxConcurrency = defaultMaxConcurrency
// set MaxConcurrency
if node.rolloutStrategy.Type == clusterv1alpha1.Progressive {
if node.rolloutStrategy.Progressive == nil {
node.rolloutStrategy.Progressive = &clusterv1alpha1.RolloutProgressive{
MaxConcurrency: placement.Spec.DecisionStrategy.GroupStrategy.ClustersPerDecisionGroup,
}
} else if node.rolloutStrategy.Progressive.MaxConcurrency.IntVal == 0 {
node.rolloutStrategy.Progressive.MaxConcurrency = placement.Spec.DecisionStrategy.GroupStrategy.ClustersPerDecisionGroup
}
}

Expand All @@ -183,7 +183,7 @@ func (g *configurationGraph) addPlacementNode(
}

// remove addon in defaults and other placements.
for _, cluster := range clusters {
for _, cluster := range node.clusters.UnsortedList() {
if _, ok := g.defaults.children[cluster]; ok {
node.addNode(g.defaults.children[cluster].mca)
delete(g.defaults.children, cluster)
Expand Down Expand Up @@ -253,10 +253,10 @@ func (n *installStrategyNode) addNode(addon *addonv1alpha1.ManagedClusterAddOn)
n.children[addon.Namespace].setUpgradeStatus()
}

func (n *installStrategyNode) addonUpgraded() int {
func (n *installStrategyNode) addonUpgradeSucceed() int {
count := 0
for _, addon := range n.children {
if desiredConfigsEqual(addon.desiredConfigs, n.desiredConfigs) && addon.mcaUpgradeStatus == upgraded {
if desiredConfigsEqual(addon.desiredConfigs, n.desiredConfigs) && addon.mcaUpgradeStatus == clusterv1alpha1.UpgradeSucceed {
count += 1
}
}
Expand All @@ -266,7 +266,7 @@ func (n *installStrategyNode) addonUpgraded() int {
func (n *installStrategyNode) addonUpgrading() int {
count := 0
for _, addon := range n.children {
if desiredConfigsEqual(addon.desiredConfigs, n.desiredConfigs) && addon.mcaUpgradeStatus == upgrading {
if desiredConfigsEqual(addon.desiredConfigs, n.desiredConfigs) && addon.mcaUpgradeStatus == clusterv1alpha1.Upgrading {
count += 1
}
}
Expand All @@ -277,57 +277,21 @@ func (n *installStrategyNode) addonUpgrading() int {
func (n *installStrategyNode) addonToUpdate() []*addonNode {
var addons []*addonNode

// sort the children by key
keys := make([]string, 0, len(n.children))
for k := range n.children {
keys = append(keys, k)
objs := map[string]clusterv1alpha1.RolloutStatus{}
for k, node := range n.children {
objs[k] = node.mcaUpgradeStatus
}
sort.Strings(keys)

total := len(n.clusters)
if total == 0 {
total = len(n.children)
}

length, _ := parseMaxConcurrency(n.maxConcurrency, total)
if length == 0 {
return addons
}
rolloutHandler := clusterv1alpha1.NewRolloutHandler(n.rolloutStrategy, *n.pdTracker, objs)
rolloutClusters := rolloutHandler.GetRolloutCluster()

for i, k := range keys {
if (i%length == 0) && len(addons) > 0 {
return addons
}

addon := n.children[k]
if addon.mcaUpgradeStatus != upgraded {
addons = append(addons, addon)
}
for _, cluster := range rolloutClusters {
addons = append(addons, n.children[cluster])
}

return addons
}

func parseMaxConcurrency(maxConcurrency intstr.IntOrString, total int) (int, error) {
var length int

switch maxConcurrency.Type {
case intstr.String:
str := maxConcurrency.StrVal
f, err := strconv.ParseFloat(str[:len(str)-1], 64)
if err != nil {
return length, err
}
length = int(math.Ceil(f / 100 * float64(total)))
case intstr.Int:
length = maxConcurrency.IntValue()
default:
return length, fmt.Errorf("incorrect MaxConcurrency type %v", maxConcurrency.Type)
}

return length, nil
}

func desiredConfigsEqual(a, b addonConfigMap) bool {
if len(a) != len(b) {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (d *clusterManagementAddonProgressingReconciler) reconcile(
setAddOnInstallProgressionsAndLastApplied(&cmaCopy.Status.InstallProgressions[i],
isUpgrade,
placementNode.addonUpgrading(),
placementNode.addonUpgraded(),
placementNode.addonUpgradeSucceed(),
len(placementNode.clusters),
)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/addon/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func RunManager(ctx context.Context, controllerContext *controllercmd.Controller

addonConfigurationController := addonconfiguration.NewAddonConfigurationController(
addonClient,
hubClusterClient,
addonInformerFactory.Addon().V1alpha1().ManagedClusterAddOns(),
addonInformerFactory.Addon().V1alpha1().ClusterManagementAddOns(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
Expand Down

0 comments on commit c2b4cda

Please sign in to comment.