diff --git a/clientbase/object_client.go b/clientbase/object_client.go index 11ef9fcbe..e68f14d05 100644 --- a/clientbase/object_client.go +++ b/clientbase/object_client.go @@ -95,7 +95,7 @@ func (p *ObjectClient) Create(o runtime.Object) (runtime.Object, error) { return result, err } -func (p *ObjectClient) GetNamespace(name, namespace string, opts metav1.GetOptions) (runtime.Object, error) { +func (p *ObjectClient) GetNamespaced(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { result := p.Factory.Object() req := p.restClient.Get(). Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version) @@ -145,7 +145,7 @@ func (p *ObjectClient) Update(name string, o runtime.Object) (runtime.Object, er return result, err } -func (p *ObjectClient) DeleteNamespace(name, namespace string, opts *metav1.DeleteOptions) error { +func (p *ObjectClient) DeleteNamespaced(namespace, name string, opts *metav1.DeleteOptions) error { req := p.restClient.Delete(). Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version) if namespace != "" { diff --git a/generator/controller_template.go b/generator/controller_template.go index 12132e00b..62dd20ea4 100644 --- a/generator/controller_template.go +++ b/generator/controller_template.go @@ -61,11 +61,11 @@ type {{.schema.CodeName}}Controller interface { type {{.schema.CodeName}}Interface interface { ObjectClient() *clientbase.ObjectClient Create(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) - GetNamespace(name, namespace string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) + GetNamespaced(namespace, name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) Get(name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) Update(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error) Delete(name string, options *metav1.DeleteOptions) error - DeleteNamespace(name, namespace string, options *metav1.DeleteOptions) error + DeleteNamespaced(namespace, name string, options *metav1.DeleteOptions) error List(opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) Watch(opts metav1.ListOptions) (watch.Interface, error) DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error @@ -203,8 +203,8 @@ func (s *{{.schema.ID}}Client) Get(name string, opts metav1.GetOptions) (*{{.pre return obj.(*{{.prefix}}{{.schema.CodeName}}), err } -func (s *{{.schema.ID}}Client) GetNamespace(name, namespace string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) { - obj, err := s.objectClient.GetNamespace(name, namespace, opts) +func (s *{{.schema.ID}}Client) GetNamespaced(namespace, name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) { + obj, err := s.objectClient.GetNamespaced(namespace, name, opts) return obj.(*{{.prefix}}{{.schema.CodeName}}), err } @@ -217,8 +217,8 @@ func (s *{{.schema.ID}}Client) Delete(name string, options *metav1.DeleteOptions return s.objectClient.Delete(name, options) } -func (s *{{.schema.ID}}Client) DeleteNamespace(name, namespace string, options *metav1.DeleteOptions) error { - return s.objectClient.DeleteNamespace(name, namespace, options) +func (s *{{.schema.ID}}Client) DeleteNamespaced(namespace, name string, options *metav1.DeleteOptions) error { + return s.objectClient.DeleteNamespaced(namespace, name, options) } func (s *{{.schema.ID}}Client) List(opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) { diff --git a/offspring/offspring.go b/offspring/offspring.go index a870c6bf0..d16b3e3ce 100644 --- a/offspring/offspring.go +++ b/offspring/offspring.go @@ -358,7 +358,7 @@ func (w *Reconciliation) deleteChild(reference ObjectReference, object runtime.O } policy := metav1.DeletePropagationForeground - return childWatcher.ObjectClient.DeleteNamespace(reference.Name, reference.Namespace, &metav1.DeleteOptions{ + return childWatcher.ObjectClient.DeleteNamespaced(reference.Namespace, reference.Name, &metav1.DeleteOptions{ PropagationPolicy: &policy, }) } diff --git a/pkg/subscribe/handler.go b/pkg/subscribe/handler.go index 9d84aa083..0aa3e703a 100644 --- a/pkg/subscribe/handler.go +++ b/pkg/subscribe/handler.go @@ -3,7 +3,7 @@ package subscribe import ( "bytes" "context" - + "errors" "time" "github.com/gorilla/websocket" @@ -124,8 +124,8 @@ func handler(apiContext *types.APIContext) error { } } - // Group is already done at this point because of goroutine above, this is just to send the error if needed - return readerGroup.Wait() + // no point in ever returning null because the connection is hijacked and we can't write it + return nil } func writeData(c *websocket.Conn, header string, buf []byte) error { @@ -158,7 +158,7 @@ func streamStore(ctx context.Context, eg *errgroup.Group, apiContext *types.APIC result <- e } - return nil + return errors.New("disconnect") }) } diff --git a/store/crd/crd_store.go b/store/crd/crd_store.go index 91da62f3d..dc6f26736 100644 --- a/store/crd/crd_store.go +++ b/store/crd/crd_store.go @@ -119,7 +119,7 @@ func (c *Store) AddSchemas(ctx context.Context, schemas ...*types.Schema) error } for schema, crd := range schemaStatus { - c.schemaStores[key(schema)] = proxy.NewProxyStoreForCRD(c.k8sClient, + c.schemaStores[key(schema)] = proxy.NewProxyStore(c.k8sClient, []string{"apis"}, crd.Spec.Group, crd.Spec.Version, diff --git a/store/proxy/proxy_store.go b/store/proxy/proxy_store.go index 334d5da49..7a6901295 100644 --- a/store/proxy/proxy_store.go +++ b/store/proxy/proxy_store.go @@ -14,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/json" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" - patchtype "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" @@ -37,33 +36,12 @@ type Store struct { kind string resourcePlural string authContext map[string]string - supportPatch bool } func NewProxyStore(k8sClient rest.Interface, prefix []string, group, version, kind, resourcePlural string) types.Store { return &errorStore{ Store: &Store{ - supportPatch: true, - k8sClient: k8sClient, - prefix: prefix, - group: group, - version: version, - kind: kind, - resourcePlural: resourcePlural, - authContext: map[string]string{ - "apiGroup": group, - "resource": resourcePlural, - }, - }, - } -} - -func NewProxyStoreForCRD(k8sClient rest.Interface, - prefix []string, group, version, kind, resourcePlural string) types.Store { - return &errorStore{ - Store: &Store{ - supportPatch: false, k8sClient: k8sClient, prefix: prefix, group: group, @@ -142,9 +120,12 @@ func (p *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty func (p *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) { namespace := getNamespace(apiContext, opt) + timeout := int64(60 * 60) req := p.common(namespace, p.k8sClient.Get()) req.VersionedParams(&metav1.ListOptions{ - Watch: true, + Watch: true, + TimeoutSeconds: &timeout, + ResourceVersion: "0", }, dynamic.VersionedParameterEncoderWithV1Fallback) body, err := req.Stream() @@ -238,38 +219,23 @@ func (p *Store) toInternal(mapper types.Mapper, data map[string]interface{}) { } func (p *Store) Update(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}, id string) (map[string]interface{}, error) { - if p.supportPatch { - p.toInternal(schema.Mapper, data) - namespace, id := splitID(id) - - req := p.common(namespace, p.k8sClient.Patch(patchtype.StrategicMergePatchType)). - Body(&unstructured.Unstructured{ - Object: data, - }). - Name(id). - SetHeader("Content-Type", string(patchtype.StrategicMergePatchType)) - - _, result, err := p.singleResult(apiContext, schema, req) - return result, err - } + namespace, id := splitID(id) + req := p.common(namespace, p.k8sClient.Get()). + Name(id) - resourceVersion, existing, err := p.byID(apiContext, schema, id) + resourceVersion, existing, err := p.singleResultRaw(apiContext, schema, req) if err != nil { return data, nil } - for k, v := range data { - existing[k] = v - } - - p.toInternal(schema.Mapper, existing) - namespace, id := splitID(id) + p.toInternal(schema.Mapper, data) + existing = convert.APIUpdateMerge(existing, data, apiContext.Query.Get("_replace") == "true") values.PutValue(existing, resourceVersion, "metadata", "resourceVersion") values.PutValue(existing, namespace, "metadata", "namespace") values.PutValue(existing, id, "metadata", "name") - req := p.common(namespace, p.k8sClient.Put()). + req = p.common(namespace, p.k8sClient.Put()). Body(&unstructured.Unstructured{ Object: existing, }). @@ -302,15 +268,22 @@ func (p *Store) Delete(apiContext *types.APIContext, schema *types.Schema, id st } func (p *Store) singleResult(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) { + version, data, err := p.singleResultRaw(apiContext, schema, req) + if err != nil { + return "", nil, err + } + p.fromInternal(schema, data) + return version, data, nil +} + +func (p *Store) singleResultRaw(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) { result := &unstructured.Unstructured{} err := p.doAuthed(apiContext, req).Into(result) if err != nil { return "", nil, err } - version := result.GetResourceVersion() - p.fromInternal(schema, result.Object) - return version, result.Object, nil + return result.GetResourceVersion(), result.Object, nil } func splitID(id string) (string, string) { diff --git a/types/convert/convert.go b/types/convert/convert.go index c214ad495..2fb14d3ba 100644 --- a/types/convert/convert.go +++ b/types/convert/convert.go @@ -11,6 +11,9 @@ import ( ) func Chan(c <-chan map[string]interface{}, f func(map[string]interface{}) map[string]interface{}) chan map[string]interface{} { + if c == nil { + return nil + } result := make(chan map[string]interface{}) go func() { for data := range c { diff --git a/types/convert/merge.go b/types/convert/merge.go new file mode 100644 index 000000000..5f4d30eb8 --- /dev/null +++ b/types/convert/merge.go @@ -0,0 +1,88 @@ +package convert + +import ( + "strings" +) + +func APIUpdateMerge(dest, src map[string]interface{}, replace bool) map[string]interface{} { + result := map[string]interface{}{} + if replace { + if status, ok := dest["status"]; ok { + result["status"] = status + } + if metadata, ok := dest["metadata"]; ok { + result["metadata"] = metadata + } + } else { + result = copyMap(dest) + } + + for k, v := range src { + if k == "metadata" { + result["metadata"] = mergeMetadata(ToMapInterface(dest["metadata"]), ToMapInterface(v)) + } else if k == "status" { + continue + } + + existing, ok := dest[k] + if ok && !replace { + result[k] = merge(existing, v) + } else { + result[k] = v + } + } + + return result +} + +func mergeMetadata(dest map[string]interface{}, src map[string]interface{}) map[string]interface{} { + result := copyMap(dest) + + labels := mergeMaps(ToMapInterface(dest["labels"]), ToMapInterface(src["labels"])) + + existingAnnotation := ToMapInterface(dest["annotations"]) + newAnnotation := ToMapInterface(src["annotations"]) + annotations := copyMap(existingAnnotation) + + for k, v := range newAnnotation { + if strings.Contains(k, "cattle.io/") { + continue + } + annotations[k] = v + } + for k, v := range existingAnnotation { + if strings.Contains(k, "cattle.io/") { + annotations[k] = v + } + } + + result["labels"] = labels + result["annotations"] = annotations + + return result +} + +func merge(dest, src interface{}) interface{} { + sm, smOk := src.(map[string]interface{}) + dm, dmOk := dest.(map[string]interface{}) + if smOk && dmOk { + return mergeMaps(dm, sm) + } + return src +} + +func mergeMaps(dest map[string]interface{}, src map[string]interface{}) interface{} { + result := copyMap(dest) + for k, v := range src { + result[k] = merge(dest[k], v) + } + return result +} + +func copyMap(src map[string]interface{}) map[string]interface{} { + result := map[string]interface{}{} + for k, v := range src { + result[k] = v + } + return result +}