diff --git a/rollouts/api/v1alpha1/rollout_types.go b/rollouts/api/v1alpha1/rollout_types.go index 071ec82e15..dd068d1006 100644 --- a/rollouts/api/v1alpha1/rollout_types.go +++ b/rollouts/api/v1alpha1/rollout_types.go @@ -38,6 +38,8 @@ type RolloutSpec struct { // PackageToTargetMatcher specifies the clusters that will receive a specific package. PackageToTargetMatcher PackageToClusterMatcher `json:"packageToTargetMatcher"` + // Strategy specifies the rollout strategy to use for this rollout. + Strategy RolloutStrategy `json:"strategy"` } type ClusterTargetSelector struct { @@ -88,10 +90,52 @@ type PackageToClusterMatcher struct { MatchExpression string `json:"matchExpression"` } +// +kubebuilder:validation:Enum=AllAtOnce;Rolling;Progressive +type StrategyType string + +const ( + AllAtOnce StrategyType = "AllAtOnce" + Rolling StrategyType = "Rolling" + Progressive StrategyType = "Progressive" +) + +type StrategyAllAtOnce struct{} + +type StrategyRolling struct { + MaxUnavailable int64 `json:"maxUnavailable"` +} + +// StrategyProgressive allows staged rollouts +// where the entire rollout will progress through different stages (aka steps, phases or waves). +type StrategyProgressive struct{} + +type RolloutStrategy struct { + Type StrategyType `json:"type"` + AllAtOnce *StrategyAllAtOnce `json:"allAtOnce,omitempty"` + Rolling *StrategyRolling `json:"rolling,omitempty"` + Progressive *StrategyProgressive `json:"progressive,omitempty"` +} + // RolloutStatus defines the observed state of Rollout type RolloutStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + // Conditions describes the reconciliation state of the object. + Conditions []metav1.Condition `json:"conditions,omitempty"` + + ClusterStatuses []ClusterStatus `json:"clusterStatuses,omitempty"` +} + +type ClusterStatus struct { + Name string `json:"name"` + PackageStatus PackageStatus `json:"packageStatus"` +} + +type PackageStatus struct { + PackageID string `json:"packageId"` + SyncStatus string `json:"syncStatus"` } //+kubebuilder:object:root=true diff --git a/rollouts/api/v1alpha1/zz_generated.deepcopy.go b/rollouts/api/v1alpha1/zz_generated.deepcopy.go index a482431c12..29f4ef85d5 100644 --- a/rollouts/api/v1alpha1/zz_generated.deepcopy.go +++ b/rollouts/api/v1alpha1/zz_generated.deepcopy.go @@ -41,6 +41,22 @@ func (in *ClusterRef) DeepCopy() *ClusterRef { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterStatus) DeepCopyInto(out *ClusterStatus) { + *out = *in + out.PackageStatus = in.PackageStatus +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterStatus. +func (in *ClusterStatus) DeepCopy() *ClusterStatus { + if in == nil { + return nil + } + out := new(ClusterStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ClusterTargetSelector) DeepCopyInto(out *ClusterTargetSelector) { *out = *in @@ -110,6 +126,21 @@ func (in *GitSource) DeepCopy() *GitSource { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PackageStatus) DeepCopyInto(out *PackageStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PackageStatus. +func (in *PackageStatus) DeepCopy() *PackageStatus { + if in == nil { + return nil + } + out := new(PackageStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PackageToClusterMatcher) DeepCopyInto(out *PackageToClusterMatcher) { *out = *in @@ -249,7 +280,7 @@ func (in *Rollout) DeepCopyInto(out *Rollout) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Rollout. @@ -308,6 +339,7 @@ func (in *RolloutSpec) DeepCopyInto(out *RolloutSpec) { out.Packages = in.Packages in.Targets.DeepCopyInto(&out.Targets) out.PackageToTargetMatcher = in.PackageToTargetMatcher + in.Strategy.DeepCopyInto(&out.Strategy) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutSpec. @@ -323,6 +355,18 @@ func (in *RolloutSpec) DeepCopy() *RolloutSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RolloutStatus) DeepCopyInto(out *RolloutStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]v1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.ClusterStatuses != nil { + in, out := &in.ClusterStatuses, &out.ClusterStatuses + *out = make([]ClusterStatus, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutStatus. @@ -335,6 +379,36 @@ func (in *RolloutStatus) DeepCopy() *RolloutStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RolloutStrategy) DeepCopyInto(out *RolloutStrategy) { + *out = *in + if in.AllAtOnce != nil { + in, out := &in.AllAtOnce, &out.AllAtOnce + *out = new(StrategyAllAtOnce) + **out = **in + } + if in.Rolling != nil { + in, out := &in.Rolling, &out.Rolling + *out = new(StrategyRolling) + **out = **in + } + if in.Progressive != nil { + in, out := &in.Progressive, &out.Progressive + *out = new(StrategyProgressive) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RolloutStrategy. +func (in *RolloutStrategy) DeepCopy() *RolloutStrategy { + if in == nil { + return nil + } + out := new(RolloutStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RootSyncInfo) DeepCopyInto(out *RootSyncInfo) { *out = *in @@ -389,3 +463,48 @@ func (in *SecretReference) DeepCopy() *SecretReference { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StrategyAllAtOnce) DeepCopyInto(out *StrategyAllAtOnce) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StrategyAllAtOnce. +func (in *StrategyAllAtOnce) DeepCopy() *StrategyAllAtOnce { + if in == nil { + return nil + } + out := new(StrategyAllAtOnce) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StrategyProgressive) DeepCopyInto(out *StrategyProgressive) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StrategyProgressive. +func (in *StrategyProgressive) DeepCopy() *StrategyProgressive { + if in == nil { + return nil + } + out := new(StrategyProgressive) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StrategyRolling) DeepCopyInto(out *StrategyRolling) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StrategyRolling. +func (in *StrategyRolling) DeepCopy() *StrategyRolling { + if in == nil { + return nil + } + out := new(StrategyRolling) + in.DeepCopyInto(out) + return out +} diff --git a/rollouts/config/crd/bases/gitops.kpt.dev_rollouts.yaml b/rollouts/config/crd/bases/gitops.kpt.dev_rollouts.yaml index 31215c7acc..eff2decf17 100644 --- a/rollouts/config/crd/bases/gitops.kpt.dev_rollouts.yaml +++ b/rollouts/config/crd/bases/gitops.kpt.dev_rollouts.yaml @@ -95,6 +95,34 @@ spec: - git - sourceType type: object + strategy: + description: Strategy specifies the rollout strategy to use for this + rollout. + properties: + allAtOnce: + type: object + progressive: + description: StrategyProgressive allows staged rollouts where + the entire rollout will progress through different stages (aka + steps, phases or waves). + type: object + rolling: + properties: + maxUnavailable: + format: int64 + type: integer + required: + - maxUnavailable + type: object + type: + enum: + - AllAtOnce + - Rolling + - Progressive + type: string + required: + - type + type: object targets: description: Targets specifies the clusters that will receive the KRM config packages. @@ -151,9 +179,107 @@ spec: required: - packageToTargetMatcher - packages + - strategy type: object status: description: RolloutStatus defines the observed state of Rollout + properties: + clusterStatuses: + items: + properties: + name: + type: string + packageStatus: + properties: + packageId: + type: string + syncStatus: + type: string + required: + - packageId + - syncStatus + type: object + required: + - name + - packageStatus + type: object + type: array + conditions: + description: Conditions describes the reconciliation state of the + object. + items: + description: "Condition contains details for one aspect of the current + state of this API Resource. --- This struct is intended for direct + use as an array at the field path .status.conditions. For example, + \n type FooStatus struct{ // Represents the observations of a + foo's current state. // Known .status.conditions.type are: \"Available\", + \"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge + // +listType=map // +listMapKey=type Conditions []metav1.Condition + `json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\" + protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }" + properties: + lastTransitionTime: + description: lastTransitionTime is the last time the condition + transitioned from one status to another. This should be when + the underlying condition changed. If that is not known, then + using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: message is a human readable message indicating + details about the transition. This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. For instance, if .metadata.generation + is currently 12, but the .status.conditions[x].observedGeneration + is 9, the condition is out of date with respect to the current + state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. Producers + of specific condition types may define expected values and + meanings for this field, and whether the values are considered + a guaranteed API. The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + --- Many .condition.type values are consistent across resources + like Available, but because arbitrary conditions can be useful + (see .node.status.conditions), the ability to deconflict is + important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + observedGeneration: + description: 'INSERT ADDITIONAL STATUS FIELD - define observed state + of cluster Important: Run "make" to regenerate code after modifying + this file' + format: int64 + type: integer type: object type: object served: true diff --git a/rollouts/config/samples/gitops_v1alpha1_rollout.yaml b/rollouts/config/samples/gitops_v1alpha1_rollout.yaml index 52023525e1..b9242e9f7e 100644 --- a/rollouts/config/samples/gitops_v1alpha1_rollout.yaml +++ b/rollouts/config/samples/gitops_v1alpha1_rollout.yaml @@ -28,14 +28,18 @@ spec: sourceType: git git: selector: - org: GoogleContainerTools - repo: kpt-samples - directory: "*" + org: droot + repo: oahu + directory: namespaces revision: main - targets: - selector: - matchExpressions: - - {key: location/island, operator: In, values: [oahu, maui]} packageToTargetMatcher: type: CEL matchExpression: 'true' + targets: + selector: + matchLabels: + env: staging +# matchExpressions: +# - {key: env, operator: In, values: [dev, staging]} + strategy: + type: AllAtOnce diff --git a/rollouts/controllers/rollout_controller.go b/rollouts/controllers/rollout_controller.go index 414430056f..62e14c156a 100644 --- a/rollouts/controllers/rollout_controller.go +++ b/rollouts/controllers/rollout_controller.go @@ -19,11 +19,16 @@ package controllers import ( "context" "flag" + "fmt" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" gkeclusterapis "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/container/v1beta1" @@ -42,6 +47,10 @@ func (o *Options) InitDefaults() { func (o *Options) BindFlags(prefix string, flags *flag.FlagSet) { } +const ( + rolloutLabel = "gitops.kpt.dev/rollout-name" +) + // RolloutReconciler reconciles a Rollout object type RolloutReconciler struct { client.Client @@ -69,63 +78,254 @@ type RolloutReconciler struct { // Fetch the READY kcc clusters. // For each kcc cluster, fetch RootSync objects in each of the KCC clusters. func (r *RolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) var rollout gitopsv1alpha1.Rollout + logger.Info("reconciling", "key", req.NamespacedName) + if err := r.Get(ctx, req.NamespacedName, &rollout); err != nil { + logger.Error(err, "unable to fetch Rollout") + // we'll ignore not-found errors, since they can't be fixed by an immediate + // requeue (we'll need to wait for a new notification), and we can get them + // on deleted requests. return ctrl.Result{}, client.IgnoreNotFound(err) } - logger := log.FromContext(ctx) - - logger.Info("reconciling", "key", req.NamespacedName) + myFinalizerName := "gitops.kpt.dev/rollouts" + if rollout.ObjectMeta.DeletionTimestamp.IsZero() { + // The object is not being deleted, so if it does not have our finalizer, + // then lets add the finalizer and update the object. This is equivalent + // registering our finalizer. + if !controllerutil.ContainsFinalizer(&rollout, myFinalizerName) { + controllerutil.AddFinalizer(&rollout, myFinalizerName) + if err := r.Update(ctx, &rollout); err != nil { + return ctrl.Result{}, fmt.Errorf("error adding finalizer: %w", err) + } + } + } else { + // The object is being deleted + if controllerutil.ContainsFinalizer(&rollout, myFinalizerName) { + // remove our finalizer from the list and update it. + controllerutil.RemoveFinalizer(&rollout, myFinalizerName) + if err := r.Update(ctx, &rollout); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update %s after delete finalizer: %w", req.Name, err) + } + } + // Stop reconciliation as the item is being deleted + return ctrl.Result{}, nil + } - gkeClusters, err := r.store.ListClusters(ctx, rollout.Spec.Targets.Selector) + err := r.reconcileRollout(ctx, &rollout) if err != nil { return ctrl.Result{}, err } - r.store.PrintClusterInfos(ctx, gkeClusters) + return ctrl.Result{}, nil +} - for _, gkeCluster := range gkeClusters.Items { - cl, _, err := r.store.GetClusterClient(ctx, &gkeCluster) - if err != nil { - return ctrl.Result{}, err - } - r.testClusterClient(ctx, cl) - } +func (r *RolloutReconciler) reconcileRollout(ctx context.Context, rollout *gitopsv1alpha1.Rollout) error { + logger := log.FromContext(ctx) - if err := r.Get(ctx, req.NamespacedName, &rollout); err != nil { - logger.Error(err, "unable to fetch Rollout") - // we'll ignore not-found errors, since they can't be fixed by an immediate - // requeue (we'll need to wait for a new notification), and we can get them - // on deleted requests. - return ctrl.Result{}, client.IgnoreNotFound(err) + clusters, err := r.store.ListClusters(ctx, rollout.Spec.Targets.Selector) + if err != nil { + return err } + logger.Info("discovered clusters", "count", len(clusters.Items)) - packageDiscoveryClient := packagediscovery.NewPackageDiscovery(rollout.Spec.Packages, r.Client, req.Namespace) + packageDiscoveryClient := packagediscovery.NewPackageDiscovery(rollout.Spec.Packages, r.Client, rollout.Namespace) discoveredPackages, err := packageDiscoveryClient.GetPackages(ctx) if err != nil { - logger.Error(err, "package discovery failed") - return ctrl.Result{}, client.IgnoreNotFound(err) + logger.Error(err, "failed to discover packages") + return client.IgnoreNotFound(err) } + logger.Info("discovered packages", "count", len(discoveredPackages), "packages", discoveredPackages) - packageClusterMatcherClient := packageclustermatcher.NewPackageClusterMatcher(gkeClusters.Items, discoveredPackages) + packageClusterMatcherClient := packageclustermatcher.NewPackageClusterMatcher(clusters.Items, discoveredPackages) allClusterPackages, err := packageClusterMatcherClient.GetClusterPackages(rollout.Spec.PackageToTargetMatcher) - if err != nil { logger.Error(err, "get cluster packages failed") - return ctrl.Result{}, client.IgnoreNotFound(err) + return client.IgnoreNotFound(err) } for _, clusterPackages := range allClusterPackages { clusterName := clusterPackages.Cluster.Name - logger.Info("cluster packages", "cluster", clusterName, "packagesCount", len(clusterPackages.Packages), "packages", clusterPackages.Packages) } - return ctrl.Result{}, err + targets, err := r.computeTargets(ctx, rollout, allClusterPackages) + if err != nil { + return err + } + + clusterStatuses, err := r.rolloutTargets(ctx, rollout, targets) + if err != nil { + return err + } + + if err := r.updateStatus(ctx, rollout, clusterStatuses); err != nil { + return err + } + return nil +} + +func (r *RolloutReconciler) updateStatus(ctx context.Context, rollout *gitopsv1alpha1.Rollout, clusterStatuses []gitopsv1alpha1.ClusterStatus) error { + logger := log.FromContext(ctx) + logger.Info("updating the status", "cluster statuses", len(clusterStatuses)) + rollout.Status.ClusterStatuses = clusterStatuses + rollout.Status.ObservedGeneration = rollout.Generation + return r.Client.Status().Update(ctx, rollout) +} + +/* +so we compute targets where each target consists of (cluster, package) +compute the RRS corresponding to each (cluster, package) pair +For RRS, name has to be function of cluster-id and package-id. +For RRS, make rootSyncTemplate +*/ +func (r *RolloutReconciler) computeTargets(ctx context.Context, + rollout *gitopsv1alpha1.Rollout, + clusterPackages []packageclustermatcher.ClusterPackages) (*Targets, error) { + + RRSkeysToBeDeleted := map[client.ObjectKey]*gitopsv1alpha1.RemoteRootSync{} + // let's take a look at existing remoterootsyncs + existingRRSs, err := r.listRemoteRootSyncs(ctx, rollout.Name, rollout.Namespace) + if err != nil { + return nil, err + } + // initially assume all the keys to be deleted + for _, rrs := range existingRRSs { + RRSkeysToBeDeleted[client.ObjectKeyFromObject(rrs)] = rrs + } + klog.Infof("Found remoterootsyncs: %s", toRemoteRootSyncNames(existingRRSs)) + targets := &Targets{} + // track keys of all the desired remote rootsyncs + for _, clusterPkg := range clusterPackages { + // TODO: figure out multiple packages per cluster story + if len(clusterPkg.Packages) < 1 { + continue + } + cluster := &clusterPkg.Cluster + pkg := &clusterPkg.Packages[0] + rrs := gitopsv1alpha1.RemoteRootSync{} + key := client.ObjectKey{ + Namespace: rollout.Namespace, + Name: fmt.Sprintf("%s-%s", pkgID(pkg), cluster.Name), + } + // since this RRS need to exist, remove it from the deletion list + delete(RRSkeysToBeDeleted, key) + // check if this remoterootsync for this package exists or not ? + err := r.Client.Get(ctx, key, &rrs) + if err != nil { + if apierrors.IsNotFound(err) { // rrs is missing + targets.ToBeCreated = append(targets.ToBeCreated, &clusterPackagePair{ + cluster: cluster, + packageRef: pkg, + }) + } else { + // some other error encountered + return nil, err + } + } else { + // remoterootsync already exists + if pkg.Revision != rrs.Spec.Template.Spec.Git.Revision { + rrs.Spec.Template.Spec.Git.Revision = pkg.Revision + // revision has been updated + targets.ToBeUpdated = append(targets.ToBeUpdated, &rrs) + } else { + targets.Unchanged = append(targets.Unchanged, &rrs) + } + } + } + + for _, rrs := range RRSkeysToBeDeleted { + targets.ToBeDeleted = append(targets.ToBeDeleted, rrs) + } + + return targets, nil +} + +func (r *RolloutReconciler) rolloutTargets(ctx context.Context, rollout *gitopsv1alpha1.Rollout, targets *Targets) ([]gitopsv1alpha1.ClusterStatus, error) { + + clusterStatuses := []gitopsv1alpha1.ClusterStatus{} + + if rollout.Spec.Strategy.Type != gitopsv1alpha1.AllAtOnce { + return clusterStatuses, fmt.Errorf("%v strategy not supported yet", rollout.Spec.Strategy.Type) + } + + for _, target := range targets.ToBeCreated { + rootSyncSpec := toRootSyncSpec(target.packageRef) + rrs := newRemoteRootSync(rollout, + gitopsv1alpha1.ClusterRef{Name: target.cluster.Name}, + rootSyncSpec, + pkgID(target.packageRef), + ) + if err := r.Create(ctx, rrs); err != nil { + klog.Warningf("Error creating RemoteRootSync %s: %v", rrs.Name, err) + return nil, err + } + clusterStatuses = append(clusterStatuses, gitopsv1alpha1.ClusterStatus{ + Name: rrs.Spec.ClusterRef.Name, + PackageStatus: gitopsv1alpha1.PackageStatus{ + PackageID: rrs.Name, + SyncStatus: rrs.Status.SyncStatus, + }, + }) + } + + for _, target := range targets.ToBeUpdated { + if err := r.Update(ctx, target); err != nil { + klog.Warningf("Error updating RemoteRootSync %s: %v", target.Name, err) + return nil, err + } + clusterStatuses = append(clusterStatuses, gitopsv1alpha1.ClusterStatus{ + Name: target.Spec.ClusterRef.Name, + PackageStatus: gitopsv1alpha1.PackageStatus{ + PackageID: target.Name, + SyncStatus: target.Status.SyncStatus, + }, + }) + } + + for _, target := range targets.ToBeDeleted { + if err := r.Delete(ctx, target); err != nil { + klog.Warningf("Error deleting RemoteRootSync %s: %v", target.Name, err) + return nil, err + } + } + + for _, target := range targets.Unchanged { + clusterStatuses = append(clusterStatuses, gitopsv1alpha1.ClusterStatus{ + Name: target.Spec.ClusterRef.Name, + PackageStatus: gitopsv1alpha1.PackageStatus{ + PackageID: target.Name, + SyncStatus: target.Status.SyncStatus, + }, + }) + } + + return clusterStatuses, nil +} + +type Targets struct { + ToBeCreated []*clusterPackagePair + ToBeUpdated []*gitopsv1alpha1.RemoteRootSync + ToBeDeleted []*gitopsv1alpha1.RemoteRootSync + Unchanged []*gitopsv1alpha1.RemoteRootSync +} + +type clusterPackagePair struct { + cluster *gkeclusterapis.ContainerCluster + packageRef *packagediscovery.DiscoveredPackage +} + +func toRemoteRootSyncNames(rsss []*gitopsv1alpha1.RemoteRootSync) []string { + var names []string + for _, rss := range rsss { + names = append(names, rss.Name) + } + return names } func (r *RolloutReconciler) testClusterClient(ctx context.Context, cl client.Client) error { @@ -139,10 +339,85 @@ func (r *RolloutReconciler) testClusterClient(ctx context.Context, cl client.Cli return nil } +func (r *RolloutReconciler) listRemoteRootSyncs(ctx context.Context, rsdName, rsdNamespace string) ([]*gitopsv1alpha1.RemoteRootSync, error) { + var list gitopsv1alpha1.RemoteRootSyncList + if err := r.List(ctx, &list, client.MatchingLabels{rolloutLabel: rsdName}, client.InNamespace(rsdNamespace)); err != nil { + return nil, err + } + var remoterootsyncs []*gitopsv1alpha1.RemoteRootSync + for i := range list.Items { + item := &list.Items[i] + remoterootsyncs = append(remoterootsyncs, item) + } + return remoterootsyncs, nil +} + +func isRRSSynced(rss *gitopsv1alpha1.RemoteRootSync) bool { + if rss.Generation != rss.Status.ObservedGeneration { + return false + } + + if rss.Status.SyncStatus == "Synced" { + return true + } + return false +} + +// Given a package identifier and cluster, create a RemoteRootSync object. +func newRemoteRootSync(rollout *gitopsv1alpha1.Rollout, clusterRef gitopsv1alpha1.ClusterRef, rssSpec *gitopsv1alpha1.RootSyncSpec, pkgID string) *gitopsv1alpha1.RemoteRootSync { + t := true + return &gitopsv1alpha1.RemoteRootSync{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", pkgID, clusterRef.Name), + Namespace: rollout.Namespace, + Labels: map[string]string{ + rolloutLabel: rollout.Name, + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: rollout.APIVersion, + Kind: rollout.Kind, + Name: rollout.Name, + UID: rollout.UID, + Controller: &t, + }, + }, + }, + Spec: gitopsv1alpha1.RemoteRootSyncSpec{ + ClusterRef: clusterRef, + Template: &gitopsv1alpha1.RootSyncInfo{ + Spec: rssSpec, + }, + }, + } +} + +func toRootSyncSpec(dpkg *packagediscovery.DiscoveredPackage) *gitopsv1alpha1.RootSyncSpec { + return &gitopsv1alpha1.RootSyncSpec{ + SourceFormat: "unstructured", + Git: &gitopsv1alpha1.GitInfo{ + Repo: fmt.Sprintf("https://github.com/%s/%s.git", dpkg.Org, dpkg.Repo), + Revision: dpkg.Revision, + Dir: dpkg.Directory, + Branch: "main", + Auth: "none", + }, + } +} + +func pkgID(dpkg *packagediscovery.DiscoveredPackage) string { + return fmt.Sprintf("%s-%s-%s", dpkg.Org, dpkg.Repo, dpkg.Directory) +} + // SetupWithManager sets up the controller with the Manager. func (r *RolloutReconciler) SetupWithManager(mgr ctrl.Manager) error { + if err := gkeclusterapis.AddToScheme(mgr.GetScheme()); err != nil { + return err + } + if err := gitopsv1alpha1.AddToScheme(mgr.GetScheme()); err != nil { + return err + } r.Client = mgr.GetClient() - gkeclusterapis.AddToScheme(mgr.GetScheme()) // setup the clusterstore r.store = &clusterstore.ClusterStore{ @@ -152,7 +427,9 @@ func (r *RolloutReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := r.store.Init(); err != nil { return err } + // TODO: watch cluster resources as well return ctrl.NewControllerManagedBy(mgr). For(&gitopsv1alpha1.Rollout{}). + Owns(&gitopsv1alpha1.RemoteRootSync{}). Complete(r) }