From 27811337868d14f262efb7f30290ea5b83e0de7a Mon Sep 17 00:00:00 2001 From: Morten Torkildsen Date: Fri, 21 Oct 2022 09:05:37 -0700 Subject: [PATCH] Support for checking sync status for RootSyncSet (#3625) --- .../config.porch.kpt.dev_rootsyncsets.yaml | 17 +- porch/controllers/main.go | 2 +- .../api/v1alpha1/rootsyncset_types.go | 12 +- .../api/v1alpha1/zz_generated.deepcopy.go | 20 ++ .../pkg/controllers/rootsyncset/controller.go | 340 +++++++++++++++--- .../pkg/controllers/rootsyncset/status.go | 134 +++++++ .../pkg/controllers/rootsyncset/watcher.go | 145 ++++++++ 7 files changed, 620 insertions(+), 50 deletions(-) create mode 100644 porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/status.go create mode 100644 porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/watcher.go diff --git a/porch/controllers/config/crd/bases/config.porch.kpt.dev_rootsyncsets.yaml b/porch/controllers/config/crd/bases/config.porch.kpt.dev_rootsyncsets.yaml index 2ac1a38592..08d2dfa70f 100644 --- a/porch/controllers/config/crd/bases/config.porch.kpt.dev_rootsyncsets.yaml +++ b/porch/controllers/config/crd/bases/config.porch.kpt.dev_rootsyncsets.yaml @@ -60,8 +60,6 @@ spec: type: string namespace: type: string - required: - - namespace type: object type: array template: @@ -108,6 +106,21 @@ spec: status: description: RootSyncSetStatus defines the observed state of RootSyncSet properties: + clusterRefStatuses: + items: + properties: + apiVersion: + type: string + kind: + type: string + name: + type: string + namespace: + type: string + syncStatus: + type: string + type: object + type: array conditions: description: Conditions describes the reconciliation state of the object. diff --git a/porch/controllers/main.go b/porch/controllers/main.go index eb8524a792..ac84080c17 100644 --- a/porch/controllers/main.go +++ b/porch/controllers/main.go @@ -49,7 +49,7 @@ import ( var ( reconcilers = map[string]newReconciler{ "rootsyncsets": func() Reconciler { - return &rootsyncset.RootSyncSetReconciler{} + return rootsyncset.NewRootSyncSetReconciler() }, "remoterootsyncsets": func() Reconciler { return &remoterootsyncset.RemoteRootSyncSetReconciler{} diff --git a/porch/controllers/rootsyncsets/api/v1alpha1/rootsyncset_types.go b/porch/controllers/rootsyncsets/api/v1alpha1/rootsyncset_types.go index a67cadcca0..963e487137 100644 --- a/porch/controllers/rootsyncsets/api/v1alpha1/rootsyncset_types.go +++ b/porch/controllers/rootsyncsets/api/v1alpha1/rootsyncset_types.go @@ -28,7 +28,7 @@ type ClusterRef struct { ApiVersion string `json:"apiVersion,omitempty"` Kind string `json:"kind,omitempty"` Name string `json:"name,omitempty"` - Namespace string `json:"namespace,'omitempty'"` + Namespace string `json:"namespace,omitempty"` } type RootSyncInfo struct { @@ -65,6 +65,16 @@ type SecretReference struct { type RootSyncSetStatus struct { // Conditions describes the reconciliation state of the object. Conditions []metav1.Condition `json:"conditions,omitempty"` + + ClusterRefStatuses []ClusterRefStatus `json:"clusterRefStatuses,omitempty"` +} + +type ClusterRefStatus struct { + ApiVersion string `json:"apiVersion,omitempty"` + Kind string `json:"kind,omitempty"` + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + SyncStatus string `json:"syncStatus,omitempty"` } //+kubebuilder:object:root=true diff --git a/porch/controllers/rootsyncsets/api/v1alpha1/zz_generated.deepcopy.go b/porch/controllers/rootsyncsets/api/v1alpha1/zz_generated.deepcopy.go index 064f3b4279..3ded92aa35 100644 --- a/porch/controllers/rootsyncsets/api/v1alpha1/zz_generated.deepcopy.go +++ b/porch/controllers/rootsyncsets/api/v1alpha1/zz_generated.deepcopy.go @@ -39,6 +39,21 @@ func (in *ClusterRef) DeepCopy() *ClusterRef { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClusterRefStatus) DeepCopyInto(out *ClusterRefStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClusterRefStatus. +func (in *ClusterRefStatus) DeepCopy() *ClusterRefStatus { + if in == nil { + return nil + } + out := new(ClusterRefStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GitInfo) DeepCopyInto(out *GitInfo) { *out = *in @@ -176,6 +191,11 @@ func (in *RootSyncSetStatus) DeepCopyInto(out *RootSyncSetStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.ClusterRefStatuses != nil { + in, out := &in.ClusterRefStatuses, &out.ClusterRefStatuses + *out = make([]ClusterRefStatus, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RootSyncSetStatus. diff --git a/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/controller.go b/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/controller.go index c5b0a0ca84..b57d9234a4 100644 --- a/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/controller.go +++ b/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/controller.go @@ -19,6 +19,8 @@ import ( "encoding/base64" "encoding/json" "fmt" + "strings" + "sync" container "cloud.google.com/go/container/apiv1" "github.com/GoogleContainerTools/kpt/porch/controllers/rootsyncsets/api/v1alpha1" @@ -26,6 +28,7 @@ import ( "golang.org/x/oauth2" "google.golang.org/api/option" containerpb "google.golang.org/genproto/googleapis/container/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -38,19 +41,53 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) var ( - rootSyncNamespace = "config-management-system" - rootSyncApiVersion = "configsync.gke.io/v1beta1" - rootSyncKind = "RootSync" + rootSyncNamespace = "config-management-system" + rootSyncGVK = schema.GroupVersionKind{ + Group: "configsync.gke.io", + Version: "v1beta1", + Kind: "RootSync", + } + rootSyncGVR = schema.GroupVersionResource{ + Group: "configsync.gke.io", + Version: "v1beta1", + Resource: "rootsyncs", + } + rootSyncSetNameLabel = "config.porch.kpt.dev/rootsyncset-name" + rootSyncSetNamespaceLabel = "config.porch.kpt.dev/rootsyncset-namespace" + + containerClusterKind = "ContainerCluster" + containerClusterApiVersion = "container.cnrm.cloud.google.com/v1beta1" + + configControllerKind = "ConfigControllerInstance" + configControllerApiVersion = "configcontroller.cnrm.cloud.google.com/v1beta1" ) +func NewRootSyncSetReconciler() *RootSyncSetReconciler { + return &RootSyncSetReconciler{ + channel: make(chan event.GenericEvent, 10), + watchers: make(map[v1alpha1.ClusterRef]*watcher), + } +} + // RootSyncSetReconciler reconciles a RootSyncSet object type RootSyncSetReconciler struct { client.Client WorkloadIdentityHelper + + // channel is where watchers put events to trigger new reconcilations based + // on watch events from target clusters. + channel chan event.GenericEvent + + mutex sync.Mutex + watchers map[v1alpha1.ClusterRef]*watcher } //go:generate go run sigs.k8s.io/controller-tools/cmd/controller-gen@v0.8.0 rbac:roleName=porch-controllers-rootsyncsets webhook paths="." output:rbac:artifacts:config=../../../config/rbac @@ -79,6 +116,7 @@ func (r *RootSyncSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err := r.Get(ctx, req.NamespacedName, &rootsyncset); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } + myFinalizerName := "config.porch.kpt.dev/finalizer" if rootsyncset.ObjectMeta.DeletionTimestamp.IsZero() { // The object is not being deleted, so if it does not have our finalizer, @@ -99,6 +137,8 @@ func (r *RootSyncSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) // so that it can be retried return ctrl.Result{}, fmt.Errorf("have problem to delete external resource: %w", err) } + // Make sure we stop any watches that are no longer needed. + r.pruneWatches(req.NamespacedName, []*v1alpha1.ClusterRef{}) // remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(&rootsyncset, myFinalizerName) if err := r.Update(ctx, &rootsyncset); err != nil { @@ -108,59 +148,246 @@ func (r *RootSyncSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Stop reconciliation as the item is being deleted return ctrl.Result{}, nil } - var patchErrs []error - for _, clusterRef := range rootsyncset.Spec.ClusterRefs { - clusterRefName := clusterRef.Kind + ":" + clusterRef.Name - client, err := r.GetClient(ctx, clusterRef, rootsyncset.Namespace) + + results := make(reconcileResult) + for _, cr := range rootsyncset.Spec.ClusterRefs { + result := clusterRefResult{} + clusterRefName := cr.Kind + ":" + cr.Name + clusterRef, err := toCanonicalClusterRef(cr, rootsyncset.Namespace) if err != nil { - patchErrs = append(patchErrs, err) + result.clientError = err + results[clusterRefName] = result continue } - rootSyncRes, newRootSync, err := BuildObjectsToApply(&rootsyncset) + client, err := r.GetClient(ctx, clusterRef, rootsyncset.Namespace) if err != nil { - patchErrs = append(patchErrs, err) + result.clientError = err + results[clusterRefName] = result continue } - data, err := json.Marshal(newRootSync) - if err != nil { - patchErrs = append(patchErrs, fmt.Errorf("failed to encode root sync to JSON: %w", err)) - continue + r.setupWatches(ctx, client, rootsyncset.Name, rootsyncset.Namespace, clusterRef) + + if err := r.patchRootSync(ctx, client, req.Name, &rootsyncset); err != nil { + result.patchError = err } - rs, err := client.Resource(rootSyncRes).Namespace(rootSyncNamespace).Patch(ctx, req.Name, types.ApplyPatchType, data, metav1.PatchOptions{FieldManager: req.Name}) + + s, err := checkSyncStatus(ctx, client, req.Name) if err != nil { - patchErrs = append(patchErrs, fmt.Errorf("failed to patch RootSync %s in cluster %s: %w", rootSyncNamespace+"/"+req.Name, clusterRefName, err)) + result.statusError = err + result.status = "Unknown" } else { - klog.Infof("Create/Update resource %s as %v", req.Name, rs) + result.status = s } + + results[clusterRefName] = result } - if len(patchErrs) != 0 { - for _, patchErr := range patchErrs { - klog.Errorf("%v", patchErr) - } - return ctrl.Result{}, patchErrs[0] + + r.pruneWatches(req.NamespacedName, rootsyncset.Spec.ClusterRefs) + + if err := r.updateStatus(ctx, &rootsyncset, results); err != nil { + klog.Errorf("failed to update status: %w", err) + return ctrl.Result{}, err + } + + if errs := results.Errors(); len(errs) > 0 { + klog.Warningf("Errors: %s", results.Error()) + return ctrl.Result{}, results } + return ctrl.Result{}, nil } -// BuildObjectsToApply config root sync -func BuildObjectsToApply(rootsyncset *v1alpha1.RootSyncSet) (schema.GroupVersionResource, *unstructured.Unstructured, error) { - gv, err := schema.ParseGroupVersion(rootSyncApiVersion) +func toCanonicalClusterRef(ref *v1alpha1.ClusterRef, rssNamespace string) (v1alpha1.ClusterRef, error) { + ns := ref.Namespace + if ns == "" { + ns = rssNamespace + } + apiVersion := ref.ApiVersion + if apiVersion == "" { + switch ref.Kind { + case containerClusterKind: + apiVersion = containerClusterApiVersion + case configControllerKind: + apiVersion = configControllerApiVersion + default: + return v1alpha1.ClusterRef{}, fmt.Errorf("clusterRef references unknown kind %q", ref.Kind) + } + } + return v1alpha1.ClusterRef{ + ApiVersion: apiVersion, + Kind: ref.Kind, + Name: ref.Name, + Namespace: ns, + }, nil +} + +func (r *RootSyncSetReconciler) updateStatus(ctx context.Context, rss *v1alpha1.RootSyncSet, results reconcileResult) error { + crss := make([]v1alpha1.ClusterRefStatus, 0) + + for _, clusterRef := range rss.Spec.ClusterRefs { + clusterRefName := clusterRef.Kind + ":" + clusterRef.Name + res := results[clusterRefName] + crss = append(crss, v1alpha1.ClusterRefStatus{ + ApiVersion: clusterRef.ApiVersion, + Kind: clusterRef.Kind, + Name: clusterRef.Name, + Namespace: clusterRef.Namespace, + SyncStatus: res.status, + }) + } + + // Don't update if there are no changes. + if equality.Semantic.DeepEqual(rss.Status.ClusterRefStatuses, crss) { + return nil + } + + rss.Status.ClusterRefStatuses = crss + return r.Client.Status().Update(ctx, rss) +} + +type reconcileResult map[string]clusterRefResult + +func (r reconcileResult) Errors() []error { + var errs []error + for _, crr := range r { + if crr.clientError != nil { + errs = append(errs, crr.clientError) + } + if crr.patchError != nil { + errs = append(errs, crr.patchError) + } + if crr.statusError != nil { + errs = append(errs, crr.statusError) + } + } + return errs +} + +// TODO: Improve the formatting of the printed errors here. +func (r reconcileResult) Error() string { + var sb strings.Builder + for clusterRef, res := range r { + if res.clientError != nil { + sb.WriteString(fmt.Sprintf("failed to create client for %s: %v\n", clusterRef, res.clientError)) + } + if res.patchError != nil { + sb.WriteString(fmt.Sprintf("failed to patch %s: %v\n", clusterRef, res.patchError)) + } + if res.statusError != nil { + sb.WriteString(fmt.Sprintf("failed to check status for %s: %v\n", clusterRef, res.statusError)) + } + } + return sb.String() +} + +type clusterRefResult struct { + clientError error + patchError error + statusError error + status string +} + +// patchRootSync patches the RootSync in the remote clusters targeted by +// the clusterRefs based on the latest revision of the template in the RootSyncSet. +func (r *RootSyncSetReconciler) patchRootSync(ctx context.Context, client dynamic.Interface, name string, rss *v1alpha1.RootSyncSet) error { + newRootSync, err := BuildObjectsToApply(rss) if err != nil { - return schema.GroupVersionResource{}, nil, fmt.Errorf("failed to parse group version when building object: %w", err) + return err + } + data, err := json.Marshal(newRootSync) + if err != nil { + return fmt.Errorf("failed to encode root sync to JSON: %w", err) + } + rs, err := client.Resource(rootSyncGVR).Namespace(rootSyncNamespace).Patch(ctx, name, types.ApplyPatchType, data, metav1.PatchOptions{FieldManager: name}) + if err != nil { + return fmt.Errorf("failed to patch RootSync: %w", err) } - rootSyncRes := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: "rootsyncs"} + klog.Infof("Create/Update resource %s as %v", name, rs) + return nil +} + +// setupWatches makes sure we have the necessary watches running against +// the remote clusters we care about. +func (r *RootSyncSetReconciler) setupWatches(ctx context.Context, client dynamic.Interface, rssName, ns string, clusterRef v1alpha1.ClusterRef) { + r.mutex.Lock() + defer r.mutex.Unlock() + nn := types.NamespacedName{ + Namespace: ns, + Name: rssName, + } + + // If we already have a watch running, make sure we have the current RootSyncSet + // listed in the liens map. + if w, found := r.watchers[clusterRef]; found { + w.liens[nn] = struct{}{} + return + } + + // Since we don't currently have a watch running, create a new watcher + // and add it to the map of watchers. + watcherCtx, cancelFunc := context.WithCancel(context.Background()) + w := &watcher{ + clusterRef: clusterRef, + ctx: watcherCtx, + cancelFunc: cancelFunc, + client: client, + channel: r.channel, + liens: make(map[types.NamespacedName]struct{}), + } + go w.watch() + r.watchers[clusterRef] = w +} + +// pruneWatches removes the current RootSyncSet from the liens map of all watchers +// that it no longer needs. If any of the watchers are no longer used by any RootSyncSets, +// they are shut down. +func (r *RootSyncSetReconciler) pruneWatches(rssnn types.NamespacedName, clusterRefs []*v1alpha1.ClusterRef) { + r.mutex.Lock() + defer r.mutex.Unlock() + + // Look through all watchers to check if it used to be needed by the RootSyncSet + // but is no longer. + for clusterRef, w := range r.watchers { + // If the watcher is still needed, we don't need to do anything. + var found bool + for _, cr := range clusterRefs { + if clusterRef == *cr { + found = true + } + } + if found { + continue + } + + // Delete the current RootSyncSet from the list of liens (it it exists) + delete(w.liens, rssnn) + // If no other RootSyncSets need the watch, stop it and remove the watcher from the map. + if len(w.liens) == 0 { + klog.Infof("clusterRef %s is no longer needed, so closing watch", clusterRef.Name) + w.cancelFunc() + delete(r.watchers, clusterRef) + } + } +} + +// BuildObjectsToApply config root sync +func BuildObjectsToApply(rootsyncset *v1alpha1.RootSyncSet) (*unstructured.Unstructured, error) { newRootSync, err := runtime.DefaultUnstructuredConverter.ToUnstructured(rootsyncset.Spec.Template) - newRootSync["apiVersion"] = rootSyncApiVersion - newRootSync["kind"] = rootSyncKind - newRootSync["metadata"] = map[string]string{"name": rootsyncset.Name, - "namespace": rootSyncNamespace} - fmt.Printf("rootsync looks like %v", newRootSync) if err != nil { - return schema.GroupVersionResource{}, nil, fmt.Errorf("failed to convert to unstructured type: %w", err) + return nil, err } u := unstructured.Unstructured{Object: newRootSync} - - return rootSyncRes, &u, nil + u.SetGroupVersionKind(rootSyncGVK) + u.SetName(rootsyncset.Name) + u.SetNamespace(rootSyncNamespace) + u.SetLabels(map[string]string{ + rootSyncSetNameLabel: rootsyncset.Name, + rootSyncSetNamespaceLabel: rootsyncset.Namespace, + }) + if err != nil { + return nil, fmt.Errorf("failed to convert to unstructured type: %w", err) + } + return &u, nil } // SetupWithManager sets up the controller with the Manager. @@ -176,25 +403,46 @@ func (r *RootSyncSetReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.RootSyncSet{}). + Watches( + &source.Channel{Source: r.channel}, + handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + var rssName string + var rssNamespace string + if o.GetLabels() != nil { + rssName = o.GetLabels()[rootSyncSetNameLabel] + rssNamespace = o.GetLabels()[rootSyncSetNamespaceLabel] + } + if rssName == "" || rssNamespace == "" { + return []reconcile.Request{} + } + klog.Infof("Resource %s contains a label for %s", o.GetName(), rssName) + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: rssNamespace, + Name: rssName, + }, + }, + } + }), + ). Complete(r) } func (r *RootSyncSetReconciler) deleteExternalResources(ctx context.Context, rootsyncset *v1alpha1.RootSyncSet) error { var deleteErrs []error - for _, clusterRef := range rootsyncset.Spec.ClusterRefs { + for _, cr := range rootsyncset.Spec.ClusterRefs { + clusterRef, err := toCanonicalClusterRef(cr, rootsyncset.Namespace) + if err != nil { + return err + } myClient, err := r.GetClient(ctx, clusterRef, rootsyncset.Namespace) if err != nil { deleteErrs = append(deleteErrs, fmt.Errorf("failed to get client when delete resource: %w", err)) continue } klog.Infof("deleting external resource %s ...", rootsyncset.Name) - gv, err := schema.ParseGroupVersion(rootSyncApiVersion) - if err != nil { - deleteErrs = append(deleteErrs, fmt.Errorf("failed to parse group version when deleting external resrouces: %w", err)) - continue - } - rootSyncRes := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: "rootsyncs"} - err = myClient.Resource(rootSyncRes).Namespace("config-management-system").Delete(ctx, rootsyncset.Name, metav1.DeleteOptions{}) + err = myClient.Resource(rootSyncGVR).Namespace("config-management-system").Delete(ctx, rootsyncset.Name, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { deleteErrs = append(deleteErrs, fmt.Errorf("failed to delete external resource : %w", err)) } @@ -267,7 +515,7 @@ func (r *RootSyncSetReconciler) GetCCRESTConfig(ctx context.Context, cluster *un return restConfig, nil } -func (r *RootSyncSetReconciler) GetClient(ctx context.Context, ref *v1alpha1.ClusterRef, ns string) (dynamic.Interface, error) { +func (r *RootSyncSetReconciler) GetClient(ctx context.Context, ref v1alpha1.ClusterRef, ns string) (dynamic.Interface, error) { key := types.NamespacedName{Namespace: ref.Namespace, Name: ref.Name} if key.Namespace == "" { key.Namespace = ns @@ -287,9 +535,9 @@ func (r *RootSyncSetReconciler) GetClient(ctx context.Context, ref *v1alpha1.Clu if err := r.Get(ctx, key, u); err != nil { return nil, fmt.Errorf("failed to get cluster: %w", err) } - if ref.Kind == "ContainerCluster" { + if ref.Kind == containerClusterKind { config, err = r.GetGKERESTConfig(ctx, u) - } else if ref.Kind == "ConfigControllerInstance" { + } else if ref.Kind == configControllerKind { config, err = r.GetCCRESTConfig(ctx, u) //TODO: tmp workaround, update after ACP add new fields } else { return nil, fmt.Errorf("failed to find target cluster, cluster kind has to be ContainerCluster or ConfigControllerInstance") diff --git a/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/status.go b/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/status.go new file mode 100644 index 0000000000..d66cbadb3e --- /dev/null +++ b/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/status.go @@ -0,0 +1,134 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootsyncset + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" +) + +// checkSyncStatus fetches the RootSync using the provided client and computes the sync status. The rules +// for computing status here mirrors the one used in the status command in the nomos cli. +func checkSyncStatus(ctx context.Context, client dynamic.Interface, rssName string) (string, error) { + rs, err := client.Resource(rootSyncGVR).Namespace(rootSyncNamespace).Get(ctx, rssName, metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("failed to get RootSync: %w", err) + } + + conditions, _, err := unstructured.NestedSlice(rs.Object, "status", "conditions") + if err != nil { + return "", fmt.Errorf("failed to extract conditions from RootSync: %w", err) + } + + val, found, err := getConditionStatus(conditions, "Stalled") + if err != nil { + return "", fmt.Errorf("error fetching condition 'Stalled' from conditions slice: %w", err) + } + if found && val == "True" { + return "Stalled", nil + } + + val, found, err = getConditionStatus(conditions, "Reconciling") + if err != nil { + return "", fmt.Errorf("error fetching condition 'Reconciling' from conditions slice: %w", err) + } + if found && val == "True" { + return "Reconciling", nil + } + + cond, found, err := getCondition(conditions, "Syncing") + if err != nil { + return "", fmt.Errorf("error fetching condition 'Syncing' from conditions slice: %w", err) + } + if !found { + return "Reconciling", nil + } + + errCount, err := extractErrorCount(cond) + if err != nil { + return "", fmt.Errorf("error extracting error count from 'Syncing' condition: %w", err) + } + if errCount > 0 { + return "Error", nil + } + + val, err = extractStringField(cond, "status") + if err != nil { + return "", fmt.Errorf("error extracting status of 'Syncing' condition: %w", err) + } + if val == "True" { + return "Pending", nil + } + + return "Synced", nil +} + +func getConditionStatus(conditions []interface{}, condType string) (string, bool, error) { + cond, found, err := getCondition(conditions, condType) + if err != nil { + return "", false, err + } + if !found { + return "", false, nil + } + s, err := extractStringField(cond, "status") + if err != nil { + return "", false, err + } + return s, true, nil +} + +func getCondition(conditions []interface{}, condType string) (map[string]interface{}, bool, error) { + for i := range conditions { + cond, ok := conditions[i].(map[string]interface{}) + if !ok { + return map[string]interface{}{}, false, fmt.Errorf("failed to extract condition %d from slice", i) + } + t, err := extractStringField(cond, "type") + if err != nil { + return map[string]interface{}{}, false, err + } + + if t != condType { + continue + } + return cond, true, nil + } + return map[string]interface{}{}, false, nil +} + +func extractStringField(condition map[string]interface{}, field string) (string, error) { + t, ok := condition[field] + if !ok { + return "", fmt.Errorf("condition does not have a type field") + } + condVal, ok := t.(string) + if !ok { + return "", fmt.Errorf("value of '%s' condition is not of type 'string'", field) + } + return condVal, nil +} + +func extractErrorCount(cond map[string]interface{}) (int64, error) { + count, found, err := unstructured.NestedInt64(cond, "errorSummary", "totalCount") + if err != nil || !found { + return 0, err + } + return count, nil +} diff --git a/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/watcher.go b/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/watcher.go new file mode 100644 index 0000000000..7dda34a3d4 --- /dev/null +++ b/porch/controllers/rootsyncsets/pkg/controllers/rootsyncset/watcher.go @@ -0,0 +1,145 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootsyncset + +import ( + "context" + "fmt" + "time" + + "github.com/GoogleContainerTools/kpt/porch/controllers/rootsyncsets/api/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +const ( + minReconnectDelay = 1 * time.Second + maxReconnectDelay = 30 * time.Second +) + +type watcher struct { + clusterRef v1alpha1.ClusterRef + + ctx context.Context + + cancelFunc context.CancelFunc + + client dynamic.Interface + + channel chan event.GenericEvent + + liens map[types.NamespacedName]struct{} +} + +func (w watcher) watch() { + clusterRefName := fmt.Sprintf("%s:%s", w.clusterRef.Kind, w.clusterRef.Name) + var events <-chan watch.Event + var watcher watch.Interface + var bookmark string + defer func() { + if watcher != nil { + watcher.Stop() + } + }() + + reconnect := newBackoffTimer(minReconnectDelay, maxReconnectDelay) + defer reconnect.Stop() + +loop: + for { + select { + case <-reconnect.channel(): + var err error + klog.Infof("Starting watch for %s... ", clusterRefName) + watcher, err = w.client.Resource(rootSyncGVR).Watch(w.ctx, v1.ListOptions{}) + if err != nil { + klog.Errorf("Cannot start watch for %s: %v; will retry", clusterRefName, err) + reconnect.backoff() + } else { + klog.Infof("Watch successfully started for %s.", clusterRefName) + events = watcher.ResultChan() + } + case e, ok := <-events: + if !ok { + klog.Errorf("Watch event stream closed for cluster %s. Will restart watch from bookmark %q", clusterRefName, bookmark) + watcher.Stop() + events = nil + watcher = nil + + // Initiate reconnect + reconnect.reset() + } else if obj, ok := e.Object.(*unstructured.Unstructured); ok { + if e.Type == watch.Bookmark { + bookmark = obj.GetResourceVersion() + klog.Infof("Watch bookmark for %s: %q", clusterRefName, bookmark) + } else { + w.channel <- event.GenericEvent{ + Object: obj, + } + } + } else { + klog.V(5).Infof("Received unexpected watch event Object from %s: %T", e.Object, clusterRefName) + } + case <-w.ctx.Done(): + if w.ctx.Err() != nil { + klog.V(2).Infof("exiting watcher for %s, because context is done: %v", clusterRefName, w.ctx.Err()) + } else { + klog.Infof("Watch background routine exiting for %s; context done", clusterRefName) + } + break loop + } + } +} + +// TODO: This comes from Porch. Find a place to put code that can be shared. +type backoffTimer struct { + min, max, curr time.Duration + timer *time.Timer +} + +func newBackoffTimer(min, max time.Duration) *backoffTimer { + return &backoffTimer{ + min: min, + max: max, + timer: time.NewTimer(min), + } +} + +func (t *backoffTimer) Stop() bool { + return t.timer.Stop() +} + +func (t *backoffTimer) channel() <-chan time.Time { + return t.timer.C +} + +func (t *backoffTimer) reset() bool { + t.curr = t.min + return t.timer.Reset(t.curr) +} + +func (t *backoffTimer) backoff() bool { + curr := t.curr * 2 + if curr > t.max { + curr = t.max + } + t.curr = curr + return t.timer.Reset(curr) +}