diff --git a/pkg/data/generic_test.go b/pkg/data/generic_test.go index e663e3953..9519b1ea5 100644 --- a/pkg/data/generic_test.go +++ b/pkg/data/generic_test.go @@ -2,30 +2,40 @@ package data import ( "context" - "encoding/json" + "errors" "fmt" - "reflect" "testing" "time" "github.com/open-policy-agent/kube-mgmt/pkg/types" + apiv1 "k8s.io/api/core/v1" + + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" ) +type testCase struct { + label string + resourceType types.ResourceType + prefix string + objs []runtime.Object + expected string + // shared between tests + expectedJson map[string]interface{} +} + func TestGenericSync(t *testing.T) { - testCases := []struct { - label string - resourceType types.ResourceType - prefix string - objs []runtime.Object - expected string - }{ + testCases := []testCase{ { label: "Single Cluster Resource", resourceType: types.ResourceType{ @@ -41,7 +51,8 @@ func TestGenericSync(t *testing.T) { Kind: "Node", }, ObjectMeta: metav1.ObjectMeta{ - Name: "node1", + Name: "node1", + ResourceVersion: "0", }, Spec: apiv1.NodeSpec{}, Status: apiv1.NodeStatus{}, @@ -53,7 +64,8 @@ func TestGenericSync(t *testing.T) { "kind": "Node", "metadata":{ "creationTimestamp":null, - "name":"node1" + "name":"node1", + "resourceVersion":"0" }, "spec":{ }, @@ -94,7 +106,8 @@ func TestGenericSync(t *testing.T) { Kind: "Node", }, ObjectMeta: metav1.ObjectMeta{ - Name: "node1", + Name: "node1", + ResourceVersion: "0", }, Spec: apiv1.NodeSpec{}, Status: apiv1.NodeStatus{}, @@ -106,7 +119,8 @@ func TestGenericSync(t *testing.T) { "kind": "Node", "metadata":{ "creationTimestamp":null, - "name":"node1" + "name":"node1", + "resourceVersion":"0" }, "spec":{ }, @@ -147,7 +161,8 @@ func TestGenericSync(t *testing.T) { Kind: "Node", }, ObjectMeta: metav1.ObjectMeta{ - Name: "node1", + Name: "node1", + ResourceVersion: "0", }, Spec: apiv1.NodeSpec{}, Status: apiv1.NodeStatus{}, @@ -158,7 +173,8 @@ func TestGenericSync(t *testing.T) { Kind: "Node", }, ObjectMeta: metav1.ObjectMeta{ - Name: "node2", + Name: "node2", + ResourceVersion: "0", }, Spec: apiv1.NodeSpec{}, Status: apiv1.NodeStatus{}, @@ -169,7 +185,8 @@ func TestGenericSync(t *testing.T) { Kind: "Node", }, ObjectMeta: metav1.ObjectMeta{ - Name: "node3", + Name: "node3", + ResourceVersion: "0", }, Spec: apiv1.NodeSpec{}, Status: apiv1.NodeStatus{}, @@ -181,7 +198,8 @@ func TestGenericSync(t *testing.T) { "kind": "Node", "metadata":{ "creationTimestamp":null, - "name":"node1" + "name":"node1", + "resourceVersion":"0" }, "spec":{ }, @@ -210,7 +228,8 @@ func TestGenericSync(t *testing.T) { "kind": "Node", "metadata":{ "creationTimestamp":null, - "name":"node2" + "name":"node2", + "resourceVersion":"0" }, "spec":{ }, @@ -239,7 +258,8 @@ func TestGenericSync(t *testing.T) { "kind": "Node", "metadata":{ "creationTimestamp":null, - "name":"node3" + "name":"node3", + "resourceVersion":"0" }, "spec":{ }, @@ -280,8 +300,9 @@ func TestGenericSync(t *testing.T) { Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "ns1", + Name: "pod1", + Namespace: "ns1", + ResourceVersion: "0", }, Spec: apiv1.PodSpec{}, Status: apiv1.PodStatus{}, @@ -295,7 +316,8 @@ func TestGenericSync(t *testing.T) { "metadata":{ "creationTimestamp":null, "name":"pod1", - "namespace":"ns1" + "namespace":"ns1", + "resourceVersion":"0" }, "spec":{ "containers":null @@ -321,8 +343,9 @@ func TestGenericSync(t *testing.T) { Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "ns1", + Name: "pod1", + Namespace: "ns1", + ResourceVersion: "0", }, Spec: apiv1.PodSpec{}, Status: apiv1.PodStatus{}, @@ -336,7 +359,8 @@ func TestGenericSync(t *testing.T) { "metadata":{ "creationTimestamp":null, "name":"pod1", - "namespace":"ns1" + "namespace":"ns1", + "resourceVersion":"0" }, "spec":{ "containers":null @@ -362,8 +386,9 @@ func TestGenericSync(t *testing.T) { Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "ns1", + Name: "pod1", + Namespace: "ns1", + ResourceVersion: "0", }, Spec: apiv1.PodSpec{}, Status: apiv1.PodStatus{}, @@ -374,8 +399,9 @@ func TestGenericSync(t *testing.T) { Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "pod2", - Namespace: "ns1", + Name: "pod2", + Namespace: "ns1", + ResourceVersion: "0", }, Spec: apiv1.PodSpec{}, Status: apiv1.PodStatus{}, @@ -386,8 +412,9 @@ func TestGenericSync(t *testing.T) { Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: "pod1", - Namespace: "ns2", + Name: "pod1", + Namespace: "ns2", + ResourceVersion: "0", }, Spec: apiv1.PodSpec{}, Status: apiv1.PodStatus{}, @@ -401,7 +428,8 @@ func TestGenericSync(t *testing.T) { "metadata":{ "creationTimestamp":null, "name":"pod1", - "namespace":"ns1" + "namespace":"ns1", + "resourceVersion":"0" }, "spec":{ "containers":null @@ -415,7 +443,8 @@ func TestGenericSync(t *testing.T) { "metadata":{ "creationTimestamp":null, "name":"pod2", - "namespace":"ns1" + "namespace":"ns1", + "resourceVersion":"0" }, "spec":{ "containers":null @@ -431,7 +460,8 @@ func TestGenericSync(t *testing.T) { "metadata":{ "creationTimestamp":null, "name":"pod1", - "namespace":"ns2" + "namespace":"ns2", + "resourceVersion":"0" }, "spec":{ "containers":null @@ -451,95 +481,218 @@ func TestGenericSync(t *testing.T) { for _, tc := range testCases { - var expectedJson map[string]interface{} - mustUnmarshalJSON(t, []byte(tc.expected), &expectedJson) + tc := tc // We will be running the tests in parallel, so avoid issues with loop var + mustUnmarshalJSON(t, []byte(tc.expected), &tc.expectedJson) t.Run(fmt.Sprintf("%s - GenerateSyncPayload", tc.label), func(t *testing.T) { - result := mustGenerateSyncPayload(t, tc.resourceType, tc.objs) - if !reflect.DeepEqual(result, expectedJson) { - t.Errorf("Sync payload expected:\n\n%v\n\nActual:\n\n%v\n", expectedJson, result) - t.Fail() - } + t.Parallel() + tc.testGenerateSyncPayload(t) }) t.Run(fmt.Sprintf("%s - Run", tc.label), func(t *testing.T) { - result := mustRun(t, sc, tc.resourceType, tc.prefix, tc.objs) - if !reflect.DeepEqual(result, expectedJson) { - t.Errorf("Fake data expected:\n\n%v\n\nActual:\n\n%v\n", expectedJson, result) - t.Fail() - } + t.Parallel() + tc.testRun(t, sc) + }) + + t.Run(fmt.Sprintf("%s - Add", tc.label), func(t *testing.T) { + t.Parallel() + tc.testAdd(t, sc) + }) + + t.Run(fmt.Sprintf("%s - Delete", tc.label), func(t *testing.T) { + t.Parallel() + tc.testDelete(t, sc) + }) + + t.Run(fmt.Sprintf("%s - Update", tc.label), func(t *testing.T) { + t.Parallel() + tc.testUpdate(t, sc) + }) + + t.Run(fmt.Sprintf("%s - Retry Add", tc.label), func(t *testing.T) { + t.Parallel() + tc.testRetryAdd(t, sc) + }) + + t.Run(fmt.Sprintf("%s - Retry Update", tc.label), func(t *testing.T) { + t.Parallel() + tc.testRetryUpdate(t, sc) + }) + + t.Run(fmt.Sprintf("%s - Retry Delete", tc.label), func(t *testing.T) { + t.Parallel() + tc.testRetryDelete(t, sc) }) } } -func mustGenerateSyncPayload(t *testing.T, resourceType types.ResourceType, objs []runtime.Object) map[string]interface{} { - t.Helper() - data := make([]unstructured.Unstructured, 0, len(objs)) - for _, obj := range objs { - data = append(data, mustUnstructure(t, obj)) +func (tc testCase) testGenerateSyncPayload(t *testing.T) { + data := make([]interface{}, 0, len(tc.objs)) + for _, obj := range tc.objs { + data = append(data, obj) } - patches, err := generateSyncPayload(data, resourceType.Namespaced) + + patches, err := generateSyncPayload(data, tc.resourceType.Namespaced) if err != nil { t.Fatalf("Unexpected error: %v", err) } - return mustJSONRoundTrip(t, patches) + result := mustJSONRoundTrip(t, patches) + mustEqual(t, result, tc.expectedJson) +} + +func (tc *testCase) testRun(t *testing.T, scheme *runtime.Scheme) { + play := script{}. + OnPut("/", tc.expectedJson, nil, nil) + + mock := tc.play(t, scheme, tc.objs, play) + mustEqual(t, mock.PrefixList, []string{tc.prefix, tc.resourceType.Resource}) } -func mustRun(t *testing.T, scheme *runtime.Scheme, resourceType types.ResourceType, prefix string, objs []runtime.Object) map[string]interface{} { +func (tc *testCase) testAdd(t *testing.T, scheme *runtime.Scheme) { + obj := tc.objs[0] + play := script{}. + OnPut("/", nil, tc.mustCreate(t, obj), nil). + OnPut(mustKey(t, obj), mustJSONRoundTrip(t, obj), nil, nil) + + tc.play(t, scheme, nil, play) +} + +func (tc *testCase) testDelete(t *testing.T, scheme *runtime.Scheme) { + obj := tc.objs[0] + play := script{}. + OnPut("/", tc.expectedJson, tc.mustRemove(t, obj), nil). + OnPatch(mustKey(t, obj), nil, nil) + + tc.play(t, scheme, tc.objs, play) +} + +func (tc *testCase) testUpdate(t *testing.T, scheme *runtime.Scheme) { + change := &unstructured.Unstructured{Object: mustJSONRoundTrip(t, tc.objs[0])} + change.SetLabels(map[string]string{"test": "update"}) + change.SetResourceVersion("1") + + play := script{}. + OnPut("/", tc.expectedJson, tc.mustUpdate(t, change), nil). + OnPut(mustKey(t, change), change.Object, nil, nil) + + tc.play(t, scheme, tc.objs, play) +} +func (tc *testCase) testRetryAdd(t *testing.T, scheme *runtime.Scheme) { + play := script{}. + OnPut("/", tc.expectedJson, nil, errors.New("test fail update")). + OnPut("/", tc.expectedJson, nil, nil) + + tc.play(t, scheme, tc.objs, play) +} + +func (tc *testCase) testRetryUpdate(t *testing.T, scheme *runtime.Scheme) { + change := &unstructured.Unstructured{Object: mustJSONRoundTrip(t, tc.objs[0])} + change.SetLabels(map[string]string{"test": "update"}) + change.SetResourceVersion("1") + + play := script{}. + OnPut("/", tc.expectedJson, tc.mustUpdate(t, change), nil). + OnPut(mustKey(t, change), change.Object, nil, errors.New("Failed to update")). + OnPut("/", nil, nil, nil) + // don't check the payload on this last put, because we + // have removed an item so it no longer matches the tc.expectedJson + + tc.play(t, scheme, tc.objs, play) +} + +func (tc *testCase) testRetryDelete(t *testing.T, scheme *runtime.Scheme) { + obj := tc.objs[0] + play := script{}. + OnPut("/", tc.expectedJson, tc.mustRemove(t, obj), nil). + OnPatch(mustKey(t, obj), nil, errors.New("test Patch failed")). + OnPut("/", nil, nil, nil) + // don't check the payload on this last put, because we + // have removed an item so it no longer matches the tc.expectedJson + + tc.play(t, scheme, tc.objs, play) +} + +func (tc *testCase) mustGetResource(t *testing.T, client *fake.FakeDynamicClient, useNamespaceFrom runtime.Object) dynamic.ResourceInterface { t.Helper() - client := fake.NewSimpleDynamicClient(scheme, objs...) - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) - fake := &mockData{ - onUpdate: cancel, + nsr := client.Resource(mustGvr(tc.resourceType)) + if !tc.resourceType.Namespaced { + return nsr } - sync := NewFromInterface(client, fake.Prefix(prefix), resourceType) - sync.RunContext(ctx) - - // Check prefixes are stacked as expected, before returning - expectedPrefix := []string{prefix, resourceType.Resource} - if !reflect.DeepEqual(fake.prefix, expectedPrefix) { - t.Errorf("Expected prefix to be %s, got %s", expectedPrefix, fake.prefix) - t.Fail() + if useNamespaceFrom == nil { + return nsr.Namespace(metav1.NamespaceAll) } - return mustJSONRoundTrip(t, fake.data) + accessor := meta.NewAccessor() + ns, err := accessor.Namespace(useNamespaceFrom) + if err != nil { + t.Fatalf("Failed to get namespace from namespaced obj %v: %v", useNamespaceFrom, err) + } + return nsr.Namespace(ns) } -func mustMarshalJSON(t *testing.T, obj interface{}) []byte { - t.Helper() - bs, err := json.Marshal(obj) - if err != nil { - t.Fatalf("error marshalling JSON: %s", err) +func (tc *testCase) mustCreate(t *testing.T, obj runtime.Object) func(*fake.FakeDynamicClient) { + return func(client *fake.FakeDynamicClient) { + t.Helper() + + r := tc.mustGetResource(t, client, obj) + if _, err := r.Create(context.Background(), &unstructured.Unstructured{Object: mustJSONRoundTrip(t, obj)}, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create object %v: %v", obj, err) + } } - return bs } -func mustUnmarshalJSON(t *testing.T, bs []byte, v interface{}) { - t.Helper() - if len(bs) > 0 { - err := json.Unmarshal(bs, v) +func (tc *testCase) mustRemove(t *testing.T, obj runtime.Object) func(*fake.FakeDynamicClient) { + return func(client *fake.FakeDynamicClient) { + t.Helper() + + m, err := meta.Accessor(obj) if err != nil { - t.Fatalf("error unmarshalling JSON: %s", err) + t.Fatalf("Failed to build accessor for %v: %v", obj, err) + } + r := tc.mustGetResource(t, client, obj) + if err := r.Delete(context.Background(), m.GetName(), metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to remove object %v: %v", obj, err) } } } -func mustJSONRoundTrip(t *testing.T, obj interface{}) map[string]interface{} { +func (tc *testCase) mustUpdate(t *testing.T, obj runtime.Object) func(*fake.FakeDynamicClient) { + return func(client *fake.FakeDynamicClient) { + t.Helper() + + r := tc.mustGetResource(t, client, obj) + if _, err := r.Update(context.Background(), obj.(*unstructured.Unstructured), metav1.UpdateOptions{}); err != nil { + t.Fatalf("Failed to create object %v: %v", obj, err) + } + } +} + +func (tc *testCase) play(t *testing.T, scheme *runtime.Scheme, objs []runtime.Object, play script) *mockData { t.Helper() - bs := mustMarshalJSON(t, obj) - var out map[string]interface{} - mustUnmarshalJSON(t, bs, &out) + client := fake.NewSimpleDynamicClient(scheme, objs...) + mock := &mockData{} + sync := NewFromInterface(client, mock.Prefix(tc.prefix), tc.resourceType, WithBackoff(0, 5*time.Second)) - return out + mock.Play(t, client, sync, play) + return mock } -func mustUnstructure(t *testing.T, obj runtime.Object) unstructured.Unstructured { +func mustKey(t *testing.T, obj runtime.Object) string { t.Helper() - current, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + + path, err := cache.MetaNamespaceKeyFunc(obj) if err != nil { - t.Fatalf("Failed to convert runtime Object to unstructured: %v", err) + t.Fatalf("Failed to get path from object %v: %v", obj, err) + } + return path +} + +func mustGvr(resourceType types.ResourceType) schema.GroupVersionResource { + return schema.GroupVersionResource{ + Group: resourceType.Group, + Version: resourceType.Version, + Resource: resourceType.Resource, } - return unstructured.Unstructured{Object: current} } diff --git a/pkg/data/mock.go b/pkg/data/mock.go index ff9b23a16..099cebf5d 100644 --- a/pkg/data/mock.go +++ b/pkg/data/mock.go @@ -1,39 +1,167 @@ package data import ( + "context" "encoding/json" + "errors" + "fmt" + "reflect" + "testing" + "time" opa_client "github.com/open-policy-agent/kube-mgmt/pkg/opa" + "k8s.io/client-go/dynamic/fake" ) // mockData emulates OPA Data Client API type mockData struct { - prefix []string - data interface{} - onUpdate func() // called after each PutData + PrefixList []string + // This function will be called by PutData and PatchData + actor func(op operation, path string, value interface{}) error +} + +// script is a sequence of steps to perform when data in mockData changes. +type script []step + +// OnPatch defines the action that will be triggered or the error that +// will be returned when the client gets a PatchData request next. +func (s script) OnPatch(expectedPath string, do func(client *fake.FakeDynamicClient), err error) script { + return append(s, step{ + operation: operationPatch, + expectedPath: expectedPath, + expectedData: nil, + action: do, + err: err, + }) +} + +// OnPut defines the action that will be triggered or the error that +// will be returned when the client gets a PutData request next. +func (s script) OnPut(expectedPath string, expectedData map[string]interface{}, do func(client *fake.FakeDynamicClient), err error) script { + return append(s, step{ + operation: operationPut, + expectedPath: expectedPath, + expectedData: expectedData, + action: do, + err: err, + }) +} + +type operation string + +const ( + operationPut operation = "PutData" + operationPatch operation = "PatchData" +) + +type step struct { + operation operation + expectedPath string + expectedData map[string]interface{} + action func(client *fake.FakeDynamicClient) + err error } // Prefix implements Data func (f *mockData) Prefix(path string) opa_client.Data { - f.prefix = append(f.prefix, path) + f.PrefixList = append(f.PrefixList, path) return f } // PatchData implements Data. Currently not supported. -func (*mockData) PatchData(string, string, *interface{}) error { - return nil +func (f *mockData) PatchData(path string, op string, value *interface{}) (err error) { + if op != "remove" { + return fmt.Errorf("unsupported operation %s", op) + } + return f.actor(operationPatch, path, nil) } // PutData implements Data -func (f *mockData) PutData(path string, value interface{}) error { - if f.onUpdate != nil { - defer f.onUpdate() - } - f.data = value - return nil +func (f *mockData) PutData(path string, value interface{}) (err error) { + return f.actor(operationPut, path, value) } +var errNotSupported = errors.New("PostData not supported") + // PostData implements Data. Currently not supported. func (*mockData) PostData(string, interface{}) (json.RawMessage, error) { - return nil, nil + return nil, errNotSupported +} + +// Run the script against a given +func (m *mockData) Play(t *testing.T, client *fake.FakeDynamicClient, sync *GenericSync, play script) { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second)) + + cursor, additions, removals := 0, 0, 0 + m.actor = func(op operation, path string, value interface{}) error { + cue := play[cursor] + cursor++ + if cue.operation != op { + t.Fatalf("Expected operation %s at step %d, got %s", cue.operation, cursor, op) + } + if path != cue.expectedPath { + t.Logf("Expected path %s, got %s", cue.expectedPath, path) + t.Fail() + } + if cue.expectedData != nil { + data := mustJSONRoundTrip(t, value) + mustEqual(t, data, cue.expectedData) + } + if cue.action != nil { + cue.action(client) + } + if op == operationPut { + additions++ + } + if op == operationPatch { + removals++ + } + if cursor >= len(play) { + cancel() + } + return cue.err + } + sync.RunContext(ctx) + + if cursor < len(play) { + t.Fatalf("Expected %d operations, got %d", len(play), cursor) + } +} + +func mustMarshalJSON(t *testing.T, obj interface{}) []byte { + t.Helper() + bs, err := json.Marshal(obj) + if err != nil { + t.Fatalf("error marshalling JSON: %s", err) + } + return bs +} + +func mustUnmarshalJSON(t *testing.T, bs []byte, v interface{}) { + t.Helper() + if len(bs) > 0 { + err := json.Unmarshal(bs, v) + if err != nil { + t.Fatalf("error unmarshalling JSON: %s", err) + } + } +} + +func mustJSONRoundTrip(t *testing.T, obj interface{}) map[string]interface{} { + t.Helper() + + bs := mustMarshalJSON(t, obj) + var out map[string]interface{} + mustUnmarshalJSON(t, bs, &out) + + return out +} + +func mustEqual(t *testing.T, result, expected interface{}) { + t.Helper() + + if !reflect.DeepEqual(result, expected) { + t.Errorf("Sync payload expected:\n\n%q\n\nActual:\n\n%q\n", expected, result) + t.Fail() + } }