Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various #62

Merged
merged 3 commits into from
Jan 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions clientbase/object_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *ObjectClient) Create(o runtime.Object) (runtime.Object, error) {
return result, err
}

func (p *ObjectClient) GetNamespace(name, namespace string, opts metav1.GetOptions) (runtime.Object, error) {
func (p *ObjectClient) GetNamespaced(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) {
result := p.Factory.Object()
req := p.restClient.Get().
Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version)
Expand Down Expand Up @@ -145,7 +145,7 @@ func (p *ObjectClient) Update(name string, o runtime.Object) (runtime.Object, er
return result, err
}

func (p *ObjectClient) DeleteNamespace(name, namespace string, opts *metav1.DeleteOptions) error {
func (p *ObjectClient) DeleteNamespaced(namespace, name string, opts *metav1.DeleteOptions) error {
req := p.restClient.Delete().
Prefix(p.getAPIPrefix(), p.gvk.Group, p.gvk.Version)
if namespace != "" {
Expand Down
12 changes: 6 additions & 6 deletions generator/controller_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ type {{.schema.CodeName}}Controller interface {
type {{.schema.CodeName}}Interface interface {
ObjectClient() *clientbase.ObjectClient
Create(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error)
GetNamespace(name, namespace string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error)
GetNamespaced(namespace, name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error)
Get(name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error)
Update(*{{.prefix}}{{.schema.CodeName}}) (*{{.prefix}}{{.schema.CodeName}}, error)
Delete(name string, options *metav1.DeleteOptions) error
DeleteNamespace(name, namespace string, options *metav1.DeleteOptions) error
DeleteNamespaced(namespace, name string, options *metav1.DeleteOptions) error
List(opts metav1.ListOptions) (*{{.schema.CodeName}}List, error)
Watch(opts metav1.ListOptions) (watch.Interface, error)
DeleteCollection(deleteOpts *metav1.DeleteOptions, listOpts metav1.ListOptions) error
Expand Down Expand Up @@ -203,8 +203,8 @@ func (s *{{.schema.ID}}Client) Get(name string, opts metav1.GetOptions) (*{{.pre
return obj.(*{{.prefix}}{{.schema.CodeName}}), err
}

func (s *{{.schema.ID}}Client) GetNamespace(name, namespace string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) {
obj, err := s.objectClient.GetNamespace(name, namespace, opts)
func (s *{{.schema.ID}}Client) GetNamespaced(namespace, name string, opts metav1.GetOptions) (*{{.prefix}}{{.schema.CodeName}}, error) {
obj, err := s.objectClient.GetNamespaced(namespace, name, opts)
return obj.(*{{.prefix}}{{.schema.CodeName}}), err
}

Expand All @@ -217,8 +217,8 @@ func (s *{{.schema.ID}}Client) Delete(name string, options *metav1.DeleteOptions
return s.objectClient.Delete(name, options)
}

func (s *{{.schema.ID}}Client) DeleteNamespace(name, namespace string, options *metav1.DeleteOptions) error {
return s.objectClient.DeleteNamespace(name, namespace, options)
func (s *{{.schema.ID}}Client) DeleteNamespaced(namespace, name string, options *metav1.DeleteOptions) error {
return s.objectClient.DeleteNamespaced(namespace, name, options)
}

func (s *{{.schema.ID}}Client) List(opts metav1.ListOptions) (*{{.schema.CodeName}}List, error) {
Expand Down
2 changes: 1 addition & 1 deletion offspring/offspring.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (w *Reconciliation) deleteChild(reference ObjectReference, object runtime.O
}

policy := metav1.DeletePropagationForeground
return childWatcher.ObjectClient.DeleteNamespace(reference.Name, reference.Namespace, &metav1.DeleteOptions{
return childWatcher.ObjectClient.DeleteNamespaced(reference.Namespace, reference.Name, &metav1.DeleteOptions{
PropagationPolicy: &policy,
})
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/subscribe/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package subscribe
import (
"bytes"
"context"

"errors"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -124,8 +124,8 @@ func handler(apiContext *types.APIContext) error {
}
}

// Group is already done at this point because of goroutine above, this is just to send the error if needed
return readerGroup.Wait()
// no point in ever returning null because the connection is hijacked and we can't write it
return nil
}

func writeData(c *websocket.Conn, header string, buf []byte) error {
Expand Down Expand Up @@ -158,7 +158,7 @@ func streamStore(ctx context.Context, eg *errgroup.Group, apiContext *types.APIC
result <- e
}

return nil
return errors.New("disconnect")
})
}

Expand Down
2 changes: 1 addition & 1 deletion store/crd/crd_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Store) AddSchemas(ctx context.Context, schemas ...*types.Schema) error
}

for schema, crd := range schemaStatus {
c.schemaStores[key(schema)] = proxy.NewProxyStoreForCRD(c.k8sClient,
c.schemaStores[key(schema)] = proxy.NewProxyStore(c.k8sClient,
[]string{"apis"},
crd.Spec.Group,
crd.Spec.Version,
Expand Down
69 changes: 21 additions & 48 deletions store/proxy/proxy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
patchtype "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
Expand All @@ -37,33 +36,12 @@ type Store struct {
kind string
resourcePlural string
authContext map[string]string
supportPatch bool
}

func NewProxyStore(k8sClient rest.Interface,
prefix []string, group, version, kind, resourcePlural string) types.Store {
return &errorStore{
Store: &Store{
supportPatch: true,
k8sClient: k8sClient,
prefix: prefix,
group: group,
version: version,
kind: kind,
resourcePlural: resourcePlural,
authContext: map[string]string{
"apiGroup": group,
"resource": resourcePlural,
},
},
}
}

func NewProxyStoreForCRD(k8sClient rest.Interface,
prefix []string, group, version, kind, resourcePlural string) types.Store {
return &errorStore{
Store: &Store{
supportPatch: false,
k8sClient: k8sClient,
prefix: prefix,
group: group,
Expand Down Expand Up @@ -142,9 +120,12 @@ func (p *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty
func (p *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
namespace := getNamespace(apiContext, opt)

timeout := int64(60 * 60)
req := p.common(namespace, p.k8sClient.Get())
req.VersionedParams(&metav1.ListOptions{
Watch: true,
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: "0",
}, dynamic.VersionedParameterEncoderWithV1Fallback)

body, err := req.Stream()
Expand Down Expand Up @@ -238,38 +219,23 @@ func (p *Store) toInternal(mapper types.Mapper, data map[string]interface{}) {
}

func (p *Store) Update(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}, id string) (map[string]interface{}, error) {
if p.supportPatch {
p.toInternal(schema.Mapper, data)
namespace, id := splitID(id)

req := p.common(namespace, p.k8sClient.Patch(patchtype.StrategicMergePatchType)).
Body(&unstructured.Unstructured{
Object: data,
}).
Name(id).
SetHeader("Content-Type", string(patchtype.StrategicMergePatchType))

_, result, err := p.singleResult(apiContext, schema, req)
return result, err
}
namespace, id := splitID(id)
req := p.common(namespace, p.k8sClient.Get()).
Name(id)

resourceVersion, existing, err := p.byID(apiContext, schema, id)
resourceVersion, existing, err := p.singleResultRaw(apiContext, schema, req)
if err != nil {
return data, nil
}

for k, v := range data {
existing[k] = v
}

p.toInternal(schema.Mapper, existing)
namespace, id := splitID(id)
p.toInternal(schema.Mapper, data)
existing = convert.APIUpdateMerge(existing, data, apiContext.Query.Get("_replace") == "true")

values.PutValue(existing, resourceVersion, "metadata", "resourceVersion")
values.PutValue(existing, namespace, "metadata", "namespace")
values.PutValue(existing, id, "metadata", "name")

req := p.common(namespace, p.k8sClient.Put()).
req = p.common(namespace, p.k8sClient.Put()).
Body(&unstructured.Unstructured{
Object: existing,
}).
Expand Down Expand Up @@ -302,15 +268,22 @@ func (p *Store) Delete(apiContext *types.APIContext, schema *types.Schema, id st
}

func (p *Store) singleResult(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) {
version, data, err := p.singleResultRaw(apiContext, schema, req)
if err != nil {
return "", nil, err
}
p.fromInternal(schema, data)
return version, data, nil
}

func (p *Store) singleResultRaw(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) {
result := &unstructured.Unstructured{}
err := p.doAuthed(apiContext, req).Into(result)
if err != nil {
return "", nil, err
}

version := result.GetResourceVersion()
p.fromInternal(schema, result.Object)
return version, result.Object, nil
return result.GetResourceVersion(), result.Object, nil
}

func splitID(id string) (string, string) {
Expand Down
3 changes: 3 additions & 0 deletions types/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
)

func Chan(c <-chan map[string]interface{}, f func(map[string]interface{}) map[string]interface{}) chan map[string]interface{} {
if c == nil {
return nil
}
result := make(chan map[string]interface{})
go func() {
for data := range c {
Expand Down
88 changes: 88 additions & 0 deletions types/convert/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package convert

import (
"strings"
)

func APIUpdateMerge(dest, src map[string]interface{}, replace bool) map[string]interface{} {
result := map[string]interface{}{}
if replace {
if status, ok := dest["status"]; ok {
result["status"] = status
}
if metadata, ok := dest["metadata"]; ok {
result["metadata"] = metadata
}
} else {
result = copyMap(dest)
}

for k, v := range src {
if k == "metadata" {
result["metadata"] = mergeMetadata(ToMapInterface(dest["metadata"]), ToMapInterface(v))
} else if k == "status" {
continue
}

existing, ok := dest[k]
if ok && !replace {
result[k] = merge(existing, v)
} else {
result[k] = v
}
}

return result
}

func mergeMetadata(dest map[string]interface{}, src map[string]interface{}) map[string]interface{} {
result := copyMap(dest)

labels := mergeMaps(ToMapInterface(dest["labels"]), ToMapInterface(src["labels"]))

existingAnnotation := ToMapInterface(dest["annotations"])
newAnnotation := ToMapInterface(src["annotations"])
annotations := copyMap(existingAnnotation)

for k, v := range newAnnotation {
if strings.Contains(k, "cattle.io/") {
continue
}
annotations[k] = v
}
for k, v := range existingAnnotation {
if strings.Contains(k, "cattle.io/") {
annotations[k] = v
}
}

result["labels"] = labels
result["annotations"] = annotations

return result
}

func merge(dest, src interface{}) interface{} {
sm, smOk := src.(map[string]interface{})
dm, dmOk := dest.(map[string]interface{})
if smOk && dmOk {
return mergeMaps(dm, sm)
}
return src
}

func mergeMaps(dest map[string]interface{}, src map[string]interface{}) interface{} {
result := copyMap(dest)
for k, v := range src {
result[k] = merge(dest[k], v)
}
return result
}

func copyMap(src map[string]interface{}) map[string]interface{} {
result := map[string]interface{}{}
for k, v := range src {
result[k] = v
}
return result
}