diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index 87f68cef..d40c1a9c 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -19,6 +19,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" ) @@ -311,7 +312,7 @@ func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) err c.Logger.Tracef("waiting for container %s", ctn.ID) // create label selector for watching the pod - selector := fmt.Sprintf("pipeline=%s", c.Pod.ObjectMeta.Name) + selector := fmt.Sprintf("pipeline=%s", fields.EscapeValue(c.Pod.ObjectMeta.Name)) // create options for watching the container opts := metav1.ListOptions{ @@ -325,16 +326,18 @@ func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) err // -> // https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface // nolint: contextcheck // ignore non-inherited new context - watch, err := c.Kubernetes.CoreV1().Pods(c.config.Namespace).Watch(context.Background(), opts) + podWatch, err := c.Kubernetes.CoreV1().Pods(c.config.Namespace).Watch(context.Background(), opts) if err != nil { return err } + defer podWatch.Stop() + for { // capture new result from the channel // // https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface - result := <-watch.ResultChan() + result := <-podWatch.ResultChan() // convert the object from the result to a pod pod, ok := result.Object.(*v1.Pod) diff --git a/runtime/kubernetes/container_test.go b/runtime/kubernetes/container_test.go index 07728893..a7c38442 100644 --- a/runtime/kubernetes/container_test.go +++ b/runtime/kubernetes/container_test.go @@ -13,9 +13,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/fake" - testcore "k8s.io/client-go/testing" ) func TestKubernetes_InspectContainer(t *testing.T) { @@ -225,35 +222,6 @@ func TestKubernetes_TailContainer(t *testing.T) { } func TestKubernetes_WaitContainer(t *testing.T) { - // setup types - _engine, err := NewMock(_pod) - if err != nil { - t.Errorf("unable to create runtime engine: %v", err) - } - - // create a new fake kubernetes client - // - // https://pkg.go.dev/k8s.io/client-go/kubernetes/fake?tab=doc#NewSimpleClientset - _kubernetes := fake.NewSimpleClientset(_pod) - - // create a new fake watcher - // - // https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#NewFake - _watch := watch.NewFake() - - // create a new watch reactor with the fake watcher - // - // https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#DefaultWatchReactor - reactor := testcore.DefaultWatchReactor(_watch, nil) - - // add watch reactor to beginning of the client chain - // - // https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#Fake.PrependWatchReactor - _kubernetes.PrependWatchReactor("pods", reactor) - - // overwrite the mock kubernetes client - _engine.Kubernetes = _kubernetes - // setup tests tests := []struct { failure bool @@ -314,12 +282,18 @@ func TestKubernetes_WaitContainer(t *testing.T) { // run tests for _, test := range tests { + // setup types + _engine, _watch, err := newMockWithWatch(_pod, "pods") + if err != nil { + t.Errorf("unable to create runtime engine: %v", err) + } + go func() { // simulate adding a pod to the watcher _watch.Add(test.object) }() - err := _engine.WaitContainer(context.Background(), test.container) + err = _engine.WaitContainer(context.Background(), test.container) if test.failure { if err == nil { diff --git a/runtime/kubernetes/kubernetes_test.go b/runtime/kubernetes/kubernetes_test.go index 45d08068..f479cad5 100644 --- a/runtime/kubernetes/kubernetes_test.go +++ b/runtime/kubernetes/kubernetes_test.go @@ -11,6 +11,9 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + testcore "k8s.io/client-go/testing" ) func TestKubernetes_New(t *testing.T) { @@ -314,3 +317,39 @@ var ( }, } ) + +// newMockWithWatch returns an Engine implementation that +// integrates with a Kubernetes runtime and a FakeWatcher +// that can be used to inject resource events into it. +func newMockWithWatch(pod *v1.Pod, watchResource string, opts ...ClientOpt) (*client, *watch.RaceFreeFakeWatcher, error) { + // setup types + _engine, err := NewMock(pod, opts...) + if err != nil { + return nil, nil, err + } + + // create a new fake kubernetes client + // + // https://pkg.go.dev/k8s.io/client-go/kubernetes/fake?tab=doc#NewSimpleClientset + _kubernetes := fake.NewSimpleClientset(pod) + + // create a new fake watcher + // + // https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#NewRaceFreeFake + _watch := watch.NewRaceFreeFake() + + // create a new watch reactor with the fake watcher + // + // https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#DefaultWatchReactor + reactor := testcore.DefaultWatchReactor(_watch, nil) + + // add watch reactor to beginning of the client chain + // + // https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#Fake.PrependWatchReactor + _kubernetes.PrependWatchReactor(watchResource, reactor) + + // overwrite the mock kubernetes client + _engine.Kubernetes = _kubernetes + + return _engine, _watch, nil +}