Skip to content

Commit

Permalink
rollouts: update core logic to be agnostic of exact cluster type (kpt…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopherFry authored and droot committed Feb 9, 2023
1 parent 77d6d2d commit a388845
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 23 deletions.
14 changes: 7 additions & 7 deletions rollouts/controllers/rollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (r *RolloutReconciler) validateProgressiveRolloutStrategy(ctx context.Conte
}

clusterWaveMap := make(map[string]string)
for _, cluster := range allClusters.Items {
for _, cluster := range allClusters {
clusterWaveMap[cluster.Name] = ""
}

Expand All @@ -224,11 +224,11 @@ func (r *RolloutReconciler) validateProgressiveRolloutStrategy(ctx context.Conte
return err
}

if len(waveClusters.Items) == 0 {
if len(waveClusters) == 0 {
return fmt.Errorf("wave %q does not target any clusters", wave.Name)
}

for _, cluster := range waveClusters.Items {
for _, cluster := range waveClusters {
currentClusterWave, found := clusterWaveMap[cluster.Name]
if !found {
// this should never happen
Expand All @@ -245,7 +245,7 @@ func (r *RolloutReconciler) validateProgressiveRolloutStrategy(ctx context.Conte
pauseWaveNameFound = pauseWaveNameFound || pauseAfterWaveName == wave.Name
}

for _, cluster := range allClusters.Items {
for _, cluster := range allClusters {
wave, _ := clusterWaveMap[cluster.Name]
if wave == "" {
return fmt.Errorf("waves should cover all clusters selected by the rollout - cluster %s is not covered by any waves", cluster.Name)
Expand Down Expand Up @@ -283,7 +283,7 @@ func (r *RolloutReconciler) reconcileRollout(ctx context.Context, rollout *gitop
}
logger.Info("discovered packages", "count", len(discoveredPackages), "packages", discoveredPackages)

packageClusterMatcherClient := packageclustermatcher.NewPackageClusterMatcher(targetClusters.Items, discoveredPackages)
packageClusterMatcherClient := packageclustermatcher.NewPackageClusterMatcher(targetClusters, discoveredPackages)
clusterPackages, err := packageClusterMatcherClient.GetClusterPackages(rollout.Spec.PackageToTargetMatcher)
if err != nil {
return err
Expand Down Expand Up @@ -442,7 +442,7 @@ func (r *RolloutReconciler) getWaveTargets(ctx context.Context, rollout *gitopsv
return nil, err
}

for _, cluster := range waveClusters.Items {
for _, cluster := range waveClusters {
clusterNameToWaveTarget[cluster.Name] = &thisWaveTarget
}

Expand Down Expand Up @@ -624,7 +624,7 @@ type Targets struct {
}

type clusterPackagePair struct {
cluster *gkeclusterapis.ContainerCluster
cluster *clusterstore.Cluster
packageRef *packagediscovery.DiscoveredPackage
}

Expand Down
31 changes: 21 additions & 10 deletions rollouts/pkg/clusterstore/clusterstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ type ClusterStore struct {
WorkloadIdentityHelper
}

type Cluster struct {
Name string
Labels map[string]string
}

func (cs *ClusterStore) Init() error {
if err := cs.WorkloadIdentityHelper.Init(cs.Config); err != nil {
return err
}
return nil
}

func (cs *ClusterStore) ListClusters(ctx context.Context, selectors ...*metav1.LabelSelector) (*gkeclusterapis.ContainerClusterList, error) {
func (cs *ClusterStore) ListClusters(ctx context.Context, selectors ...*metav1.LabelSelector) ([]Cluster, error) {
gkeClusters, err := cs.listClusters(ctx, selectors[0])
if err != nil {
return nil, err
Expand Down Expand Up @@ -79,17 +84,14 @@ func (cs *ClusterStore) ListClusters(ctx context.Context, selectors ...*metav1.L
return strings.Compare(gkeClusters.Items[i].Name, gkeClusters.Items[j].Name) == -1
})

return gkeClusters, nil
}
clusters := []Cluster{}

func (cs *ClusterStore) PrintClusterInfos(ctx context.Context, clusters *gkeclusterapis.ContainerClusterList) {
logger := log.FromContext(ctx)
for _, gkeCluster := range clusters.Items {
logger.Info("gke clusters", "namespace", gkeCluster.Namespace, "name", gkeCluster.Name)
for _, cond := range gkeCluster.Status.Conditions {
logger.Info("gke cluster", "name", gkeCluster.Name, "condition", cond)
}
for _, containerCluster := range gkeClusters.Items {
cluster := toCluster(&containerCluster)
clusters = append(clusters, cluster)
}

return clusters, nil
}

func (cs *ClusterStore) getCluster(ctx context.Context, name string) (*gkeclusterapis.ContainerCluster, error) {
Expand Down Expand Up @@ -199,3 +201,12 @@ func (cs *ClusterStore) getConfigConnectorContextTokenSource(ctx context.Context
}
return cs.WorkloadIdentityHelper.GetGcloudAccessTokenSource(ctx, kubeServiceAccount, googleServiceAccount)
}

func toCluster(containerCluster *gkeclusterapis.ContainerCluster) Cluster {
cluster := Cluster{
Name: containerCluster.Name,
Labels: containerCluster.Labels,
}

return cluster
}
12 changes: 6 additions & 6 deletions rollouts/pkg/packageclustermatcher/packageclustermatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,25 @@ package packageclustermatcher
import (
"fmt"

gkeclusterapis "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/container/v1beta1"
gitopsv1alpha1 "github.com/GoogleContainerTools/kpt/rollouts/api/v1alpha1"
"github.com/GoogleContainerTools/kpt/rollouts/pkg/clusterstore"
"github.com/GoogleContainerTools/kpt/rollouts/pkg/packagediscovery"

"github.com/google/cel-go/cel"
"github.com/google/cel-go/checker/decls"
)

type PackageClusterMatcher struct {
clusters []gkeclusterapis.ContainerCluster
clusters []clusterstore.Cluster
packages []packagediscovery.DiscoveredPackage
}

type ClusterPackages struct {
Cluster gkeclusterapis.ContainerCluster
Cluster clusterstore.Cluster
Packages []packagediscovery.DiscoveredPackage
}

func NewPackageClusterMatcher(clusters []gkeclusterapis.ContainerCluster, packages []packagediscovery.DiscoveredPackage) *PackageClusterMatcher {
func NewPackageClusterMatcher(clusters []clusterstore.Cluster, packages []packagediscovery.DiscoveredPackage) *PackageClusterMatcher {
return &PackageClusterMatcher{
clusters: clusters,
packages: packages,
Expand All @@ -56,8 +56,8 @@ func (m *PackageClusterMatcher) GetClusterPackages(matcher gitopsv1alpha1.Packag
matchedPackages = packages
case gitopsv1alpha1.CustomMatcher:
celCluster := map[string]interface{}{
"name": cluster.ObjectMeta.Name,
"labels": cluster.ObjectMeta.Labels,
"name": cluster.Name,
"labels": cluster.Labels,
}

for _, discoveredPackage := range packages {
Expand Down

0 comments on commit a388845

Please sign in to comment.