diff --git a/pkg/kstatus/polling/clusterreader/caching_reader.go b/pkg/kstatus/polling/clusterreader/caching_reader.go index 94f646a3..91e80341 100644 --- a/pkg/kstatus/polling/clusterreader/caching_reader.go +++ b/pkg/kstatus/polling/clusterreader/caching_reader.go @@ -11,9 +11,14 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/pager" "sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/controller-runtime/pkg/client" @@ -232,13 +237,11 @@ func (c *CachingClusterReader) Sync(ctx context.Context) error { } return err } - var listOptions []client.ListOption + ns := "" if mapping.Scope == meta.RESTScopeNamespace { - listOptions = append(listOptions, client.InNamespace(gn.Namespace)) + ns = gn.Namespace } - var list unstructured.UnstructuredList - list.SetGroupVersionKind(mapping.GroupVersionKind) - err = c.reader.List(ctx, &list, listOptions...) + list, err := c.listUnstructured(ctx, mapping.GroupVersionKind, ns) if err != nil { // If the context was cancelled, we just stop the work and return // the error. @@ -254,9 +257,82 @@ func (c *CachingClusterReader) Sync(ctx context.Context) error { continue } cache[gn] = cacheEntry{ - resources: list, + resources: *list, } } c.cache = cache return nil } + +// listUnstructured performs one or more LIST calls, paginating the requests +// and aggregating the results. If aggregated, only the ResourceVersion, +// SelfLink, and Items will be populated. The default page size is 500. +func (c *CachingClusterReader) listUnstructured( + ctx context.Context, + gvk schema.GroupVersionKind, + namespace string, +) (*unstructured.UnstructuredList, error) { + mOpts := metav1.ListOptions{} + mOpts.SetGroupVersionKind(gvk) + obj, _, err := pager.New(c.listPageFunc(namespace)).List(ctx, mOpts) + if err != nil { + return nil, err + } + + switch t := obj.(type) { + case *unstructured.UnstructuredList: + // all in one + return t, nil + case *metainternalversion.List: + // aggregated result + u := &unstructured.UnstructuredList{} + u.SetGroupVersionKind(gvk) + // Only ResourceVersion & SelfLink are copied into the aggregated result + // by ListPager. + if t.ResourceVersion != "" { + u.SetResourceVersion(t.ResourceVersion) + } + if t.SelfLink != "" { + u.SetSelfLink(t.SelfLink) + } + u.Items = make([]unstructured.Unstructured, len(t.Items)) + for i, item := range t.Items { + ui, ok := item.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("unexpected list item type: %t", item) + } + u.Items[i] = *ui + } + return u, nil + default: + return nil, fmt.Errorf("unexpected list type: %t", t) + } +} + +func (c *CachingClusterReader) listPageFunc(namespace string) pager.ListPageFunc { + return func(ctx context.Context, mOpts metav1.ListOptions) (runtime.Object, error) { + mOptsCopy := mOpts + labelSelector, err := labels.Parse(mOpts.LabelSelector) + if err != nil { + return nil, fmt.Errorf("failed to parse label selector: %w", err) + } + fieldSelector, err := fields.ParseSelector(mOpts.FieldSelector) + if err != nil { + return nil, fmt.Errorf("failed to parse field selector: %w", err) + } + cOpts := &client.ListOptions{ + LabelSelector: labelSelector, + FieldSelector: fieldSelector, + Namespace: namespace, + Limit: mOpts.Limit, + Continue: mOpts.Continue, + Raw: &mOptsCopy, + } + var list unstructured.UnstructuredList + list.SetGroupVersionKind(mOpts.GroupVersionKind()) + // Note: client.ListOptions only supports Exact ResourceVersion matching. + // So leave ResourceVersion blank to get Any ResourceVersion. + err = c.reader.List(ctx, &list, cOpts) + return &list, err + } +} diff --git a/pkg/kstatus/polling/clusterreader/caching_reader_test.go b/pkg/kstatus/polling/clusterreader/caching_reader_test.go index 7d4aabd0..fbcf05cd 100644 --- a/pkg/kstatus/polling/clusterreader/caching_reader_test.go +++ b/pkg/kstatus/polling/clusterreader/caching_reader_test.go @@ -5,16 +5,21 @@ package clusterreader import ( "context" + "encoding/json" "fmt" "sort" + "strconv" "testing" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/cli-utils/pkg/object" "sigs.k8s.io/cli-utils/pkg/testutil" @@ -29,12 +34,22 @@ var ( ) func TestSync(t *testing.T) { + // Use a custom Asserter to customize the comparison options + asserter := testutil.NewAsserter( + cmpopts.EquateErrors(), + gkNamespaceComparer(), + cacheEntryComparer(), + ) + testCases := map[string]struct { identifiers object.ObjMetadataSet + clusterObjs map[gkNamespace][]unstructured.Unstructured expectedSynced []gkNamespace + expectedCached map[gkNamespace]cacheEntry }{ "no identifiers": { - identifiers: object.ObjMetadataSet{}, + identifiers: object.ObjMetadataSet{}, + expectedCached: map[gkNamespace]cacheEntry{}, }, "same GVK in multiple namespaces": { identifiers: object.ObjMetadataSet{ @@ -49,30 +64,146 @@ func TestSync(t *testing.T) { Namespace: "Bar", }, }, + clusterObjs: map[gkNamespace][]unstructured.Unstructured{ + {GroupKind: deploymentGVK.GroupKind(), Namespace: "Foo"}: { + { + Object: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]interface{}{ + "name": "deployment-1", + "namespace": "Foo", + }, + }, + }, + }, + {GroupKind: deploymentGVK.GroupKind(), Namespace: "Bar"}: { + { + Object: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]interface{}{ + "name": "deployment-2", + "namespace": "Bar", + }, + }, + }, + }, + }, expectedSynced: []gkNamespace{ - { - GroupKind: deploymentGVK.GroupKind(), - Namespace: "Foo", + {GroupKind: deploymentGVK.GroupKind(), Namespace: "Foo"}, + {GroupKind: rsGVK.GroupKind(), Namespace: "Foo"}, + {GroupKind: podGVK.GroupKind(), Namespace: "Foo"}, + {GroupKind: deploymentGVK.GroupKind(), Namespace: "Bar"}, + {GroupKind: rsGVK.GroupKind(), Namespace: "Bar"}, + {GroupKind: podGVK.GroupKind(), Namespace: "Bar"}, + }, + expectedCached: map[gkNamespace]cacheEntry{ + {GroupKind: deploymentGVK.GroupKind(), Namespace: "Foo"}: { + resources: unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "apps/v1", "kind": "Deployment"}, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]interface{}{ + "name": "deployment-1", + "namespace": "Foo", + }, + }, + }, + }, + }, }, - { - GroupKind: rsGVK.GroupKind(), - Namespace: "Foo", + {GroupKind: rsGVK.GroupKind(), Namespace: "Foo"}: { + resources: unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "apps/v1", "kind": "ReplicaSet"}, + }, }, - { - GroupKind: podGVK.GroupKind(), - Namespace: "Foo", + {GroupKind: podGVK.GroupKind(), Namespace: "Foo"}: { + resources: unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "v1", "kind": "Pod"}, + }, }, - { - GroupKind: deploymentGVK.GroupKind(), - Namespace: "Bar", + {GroupKind: deploymentGVK.GroupKind(), Namespace: "Bar"}: { + resources: unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "apps/v1", "kind": "Deployment"}, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": map[string]interface{}{ + "name": "deployment-2", + "namespace": "Bar", + }, + }, + }, + }, + }, }, - { - GroupKind: rsGVK.GroupKind(), - Namespace: "Bar", + {GroupKind: rsGVK.GroupKind(), Namespace: "Bar"}: { + resources: unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "apps/v1", "kind": "ReplicaSet"}, + }, }, - { - GroupKind: podGVK.GroupKind(), - Namespace: "Bar", + {GroupKind: podGVK.GroupKind(), Namespace: "Bar"}: { + resources: unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "v1", "kind": "Pod"}, + }, + }, + }, + }, + } + + barPodGKN := gkNamespace{GroupKind: podGVK.GroupKind(), Namespace: "Bar"} + // 1001 = 3 pages of 500 + barObjs := make([]unstructured.Unstructured, 1001) + for i := 0; i < len(barObjs); i++ { + barObjs[i] = unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": podGVK.GroupVersion().String(), + "kind": podGVK.Kind, + "metadata": map[string]interface{}{ + "name": fmt.Sprintf("pod-%d", i), + "namespace": barPodGKN.Namespace, + }, + }, + } + } + testCases["paginated"] = struct { + identifiers object.ObjMetadataSet + clusterObjs map[gkNamespace][]unstructured.Unstructured + expectedSynced []gkNamespace + expectedCached map[gkNamespace]cacheEntry + }{ + identifiers: object.ObjMetadataSet{ + // any one pod + { + GroupKind: podGVK.GroupKind(), + Name: "pod-99", + Namespace: barPodGKN.Namespace, + }, + }, + clusterObjs: map[gkNamespace][]unstructured.Unstructured{ + barPodGKN: barObjs, + }, + expectedSynced: []gkNamespace{ + // expect 3 paginated calls to LIST + barPodGKN, + barPodGKN, + barPodGKN, + }, + expectedCached: map[gkNamespace]cacheEntry{ + barPodGKN: { + resources: unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "apiVersion": podGVK.GroupVersion().String(), + "kind": podGVK.Kind, + }, + // all the deployments in the same namespace + Items: barObjs, }, }, }, @@ -86,7 +217,9 @@ func TestSync(t *testing.T) { for tn, tc := range testCases { t.Run(tn, func(t *testing.T) { - fakeReader := &fakeReader{} + fakeReader := &fakeReader{ + clusterObjs: tc.clusterObjs, + } clusterReader, err := newCachingClusterReader(fakeReader, fakeMapper, tc.identifiers) require.NoError(t, err) @@ -98,9 +231,8 @@ func TestSync(t *testing.T) { sortGVKNamespaces(synced) expectedSynced := tc.expectedSynced sortGVKNamespaces(expectedSynced) - assert.Equal(t, expectedSynced, synced) - - assert.Equal(t, len(tc.expectedSynced), len(clusterReader.cache)) + asserter.Equal(t, expectedSynced, synced) + asserter.Equal(t, tc.expectedCached, clusterReader.cache) }) } } @@ -208,6 +340,7 @@ func sortGVKNamespaces(gvkNamespaces []gkNamespace) { } type fakeReader struct { + clusterObjs map[gkNamespace][]unstructured.Unstructured syncedGVKNamespaces []gkNamespace err error } @@ -218,19 +351,78 @@ func (f *fakeReader) Get(_ context.Context, _ client.ObjectKey, _ client.Object) //nolint:gocritic func (f *fakeReader) List(_ context.Context, list client.ObjectList, opts ...client.ListOption) error { - var namespace string - for _, opt := range opts { - switch opt := opt.(type) { - case client.InNamespace: - namespace = string(opt) - } - } + listOpts := &client.ListOptions{} + listOpts.ApplyOptions(opts) gvk := list.GetObjectKind().GroupVersionKind() - f.syncedGVKNamespaces = append(f.syncedGVKNamespaces, gkNamespace{ + query := gkNamespace{ GroupKind: gvk.GroupKind(), - Namespace: namespace, + Namespace: listOpts.Namespace, + } + + f.syncedGVKNamespaces = append(f.syncedGVKNamespaces, query) + + if f.err != nil { + return f.err + } + + results, ok := f.clusterObjs[query] + if !ok { + // no results + return nil + } + + uList, ok := list.(*unstructured.UnstructuredList) + if !ok { + return fmt.Errorf("unexpected list type: %T", list) + } + + if listOpts.Limit > 0 && len(results) > 0 { + // return paginated results from Continue to Continue + Limit + start := int64(0) + if listOpts.Continue != "" { + var err error + start, err = strconv.ParseInt(listOpts.Continue, 10, 64) + if err != nil { + return fmt.Errorf("invalid continue value: %q", listOpts.Continue) + } + } + end := start + listOpts.Limit + max := int64(len(results)) + if end > max { + end = max + } else { + // set continue if more results are available + uList.SetContinue(strconv.FormatInt(end, 10)) + } + uList.Items = append(uList.Items, results[start:end]...) + } else { + uList.Items = results + } + + return nil +} + +func gkNamespaceComparer() cmp.Option { + return cmp.Comparer(func(x, y gkNamespace) bool { + return x.GroupKind == y.GroupKind && + x.Namespace == y.Namespace }) +} - return f.err +func cacheEntryComparer() cmp.Option { + return cmp.Comparer(func(x, y cacheEntry) bool { + if x.err != y.err { + return false + } + xBytes, err := json.Marshal(x.resources) + if err != nil { + panic(fmt.Sprintf("failed to marshal item x to json: %v", err)) + } + yBytes, err := json.Marshal(y.resources) + if err != nil { + panic(fmt.Sprintf("failed to marshal item y to json: %v", err)) + } + return string(xBytes) == string(yBytes) + }) }