Skip to content

Commit

Permalink
Avoid reporting context cancellation or deadline expiration as resour…
Browse files Browse the repository at this point in the history
…ce error
  • Loading branch information
mortent committed Feb 9, 2022
1 parent 3f2bc6a commit 3f86e78
Show file tree
Hide file tree
Showing 18 changed files with 439 additions and 193 deletions.
12 changes: 9 additions & 3 deletions pkg/kstatus/polling/clusterreader/caching_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ package clusterreader

import (
"context"
"errors"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -169,7 +170,7 @@ func (c *CachingClusterReader) Get(_ context.Context, key client.ObjectKey, obj
return nil
}
}
return errors.NewNotFound(mapping.Resource.GroupResource(), key.Name)
return apierrors.NewNotFound(mapping.Resource.GroupResource(), key.Name)
}

// ListNamespaceScoped lists all resource identifier by the GVK of the list, the namespace and the selector
Expand Down Expand Up @@ -239,7 +240,12 @@ func (c *CachingClusterReader) Sync(ctx context.Context) error {
list.SetGroupVersionKind(mapping.GroupVersionKind)
err = c.reader.List(ctx, &list, listOptions...)
if err != nil {
// We continue even if there is an error. Whenever any pollers
// If the context was cancelled, we just stop the work and return
// the error.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
}
// For other errors, we just keep it the error. Whenever any pollers
// request a resource covered by this gns, we just return the
// error.
cache[gn] = cacheEntry{
Expand Down
66 changes: 66 additions & 0 deletions pkg/kstatus/polling/clusterreader/fake/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package fake

import (
"context"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type FakeClusterReader struct {
NoopClusterReader

GetResource *unstructured.Unstructured
GetErr error

ListResources *unstructured.UnstructuredList
ListErr error

SyncErr error
}

func (f *FakeClusterReader) Get(_ context.Context, _ client.ObjectKey, u *unstructured.Unstructured) error {
if f.GetResource != nil {
u.Object = f.GetResource.Object
}
return f.GetErr
}

func (f *FakeClusterReader) ListNamespaceScoped(_ context.Context, list *unstructured.UnstructuredList, _ string, _ labels.Selector) error {
if f.ListResources != nil {
list.Items = f.ListResources.Items
}
return f.ListErr
}

func (f *FakeClusterReader) Sync(_ context.Context) error {
return f.SyncErr
}

func NewNoopClusterReader() *NoopClusterReader {
return &NoopClusterReader{}
}

type NoopClusterReader struct{}

func (n *NoopClusterReader) Get(_ context.Context, _ client.ObjectKey, _ *unstructured.Unstructured) error {
return nil
}

func (n *NoopClusterReader) ListNamespaceScoped(_ context.Context, _ *unstructured.UnstructuredList,
_ string, _ labels.Selector) error {
return nil
}

func (n *NoopClusterReader) ListClusterScoped(_ context.Context, _ *unstructured.UnstructuredList,
_ labels.Selector) error {
return nil
}

func (n *NoopClusterReader) Sync(_ context.Context) error {
return nil
}
43 changes: 31 additions & 12 deletions pkg/kstatus/polling/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package engine

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -167,10 +168,7 @@ func (r *statusPollerRunner) Run() {

err := r.syncAndPoll()
if err != nil {
r.eventChannel <- event.Event{
EventType: event.ErrorEvent,
Error: err,
}
r.handleSyncAndPollErr(err)
return
}

Expand All @@ -182,16 +180,28 @@ func (r *statusPollerRunner) Run() {
// First sync and then compute status for all resources.
err := r.syncAndPoll()
if err != nil {
r.eventChannel <- event.Event{
EventType: event.ErrorEvent,
Error: err,
}
r.handleSyncAndPollErr(err)
return
}
}
}
}

// handleSyncAndPollErr decides what to do if we encounter an error while
// fetching resources to compute status. Errors are usually returned
// as an ErrorEvent, but we handle context cancellation or deadline exceeded
// differently since they aren't really errors, but a signal that the
// process should shut down.
func (r *statusPollerRunner) handleSyncAndPollErr(err error) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.eventChannel <- event.Event{
EventType: event.ErrorEvent,
Error: err,
}
}

func (r *statusPollerRunner) syncAndPoll() error {
// First trigger a sync of the ClusterReader. This may or may not actually
// result in calls to the cluster, depending on the implementation.
Expand All @@ -204,17 +214,25 @@ func (r *statusPollerRunner) syncAndPoll() error {
// Poll all resources and compute status. If the polling of resources has completed (based
// on information from the StatusAggregator and the value of pollUntilCancelled), we send
// a CompletedEvent and return.
r.pollStatusForAllResources()
return nil
return r.pollStatusForAllResources()
}

// pollStatusForAllResources iterates over all the resources in the set and delegates
// to the appropriate engine to compute the status.
func (r *statusPollerRunner) pollStatusForAllResources() {
func (r *statusPollerRunner) pollStatusForAllResources() error {
for _, id := range r.identifiers {
// Check if the context has been cancelled on every iteration.
select {
case <-r.ctx.Done():
return r.ctx.Err()
default:
}
gk := id.GroupKind
statusReader := r.statusReaderForGroupKind(gk)
resourceStatus := statusReader.ReadStatus(r.ctx, r.clusterReader, id)
resourceStatus, err := statusReader.ReadStatus(r.ctx, r.clusterReader, id)
if err != nil {
return err
}
if r.isUpdatedResourceStatus(resourceStatus) {
r.previousResourceStatuses[id] = resourceStatus
r.eventChannel <- event.Event{
Expand All @@ -223,6 +241,7 @@ func (r *statusPollerRunner) pollStatusForAllResources() {
}
}
}
return nil
}

func (r *statusPollerRunner) statusReaderForGroupKind(gk schema.GroupKind) StatusReader {
Expand Down
79 changes: 71 additions & 8 deletions pkg/kstatus/polling/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
fakecr "sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader/fake"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/testutil"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
fakemapper "sigs.k8s.io/cli-utils/pkg/testutil"
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestStatusPollerRunner(t *testing.T) {
DefaultStatusReader: tc.defaultStatusReader,
StatusReaders: []StatusReader{},
ClusterReaderFactory: ClusterReaderFactoryFunc(func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error) {
return testutil.NewNoopClusterReader(), nil
return fakecr.NewNoopClusterReader(), nil
}),
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func TestNewStatusPollerRunnerCancellation(t *testing.T) {

engine := PollerEngine{
ClusterReaderFactory: ClusterReaderFactoryFunc(func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error) {
return testutil.NewNoopClusterReader(), nil
return fakecr.NewNoopClusterReader(), nil
}),
}

Expand All @@ -168,6 +168,69 @@ func TestNewStatusPollerRunnerCancellation(t *testing.T) {
}
}

func TestNewStatusPollerRunnerCancellationWithMultipleResources(t *testing.T) {
identifiers := object.ObjMetadataSet{
{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "Deployment",
},
Name: "foo",
Namespace: "default",
},
{
GroupKind: schema.GroupKind{
Group: "apps",
Kind: "StatefulSet",
},
Name: "bar",
Namespace: "default",
},
}

fakeMapper := fakemapper.NewFakeRESTMapper(
appsv1.SchemeGroupVersion.WithKind("Deployment"),
appsv1.SchemeGroupVersion.WithKind("StatefulSet"),
)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

timer := time.NewTimer(10 * time.Second)

engine := PollerEngine{
Mapper: fakeMapper,
ClusterReaderFactory: ClusterReaderFactoryFunc(func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error) {
return &fakecr.FakeClusterReader{
SyncErr: context.Canceled,
}, nil
}),
}

options := Options{
PollInterval: 1 * time.Second,
}

eventChannel := engine.Poll(ctx, identifiers, options)

var events []event.Event
loop:
for {
select {
case e, ok := <-eventChannel:
if !ok {
timer.Stop()
break loop
}
events = append(events, e)
case <-timer.C:
t.Errorf("expected runner to time out, but it didn't")
return
}
}
assert.Equal(t, 0, len(events))
}

func TestNewStatusPollerRunnerIdentifierValidation(t *testing.T) {
identifiers := object.ObjMetadataSet{
{
Expand All @@ -184,7 +247,7 @@ func TestNewStatusPollerRunnerIdentifierValidation(t *testing.T) {
appsv1.SchemeGroupVersion.WithKind("Deployment"),
),
ClusterReaderFactory: ClusterReaderFactoryFunc(func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error) {
return testutil.NewNoopClusterReader(), nil
return fakecr.NewNoopClusterReader(), nil
}),
}

Expand Down Expand Up @@ -217,7 +280,7 @@ func (f *fakeStatusReader) Supports(schema.GroupKind) bool {
return true
}

func (f *fakeStatusReader) ReadStatus(_ context.Context, _ ClusterReader, identifier object.ObjMetadata) *event.ResourceStatus {
func (f *fakeStatusReader) ReadStatus(_ context.Context, _ ClusterReader, identifier object.ObjMetadata) (*event.ResourceStatus, error) {
count := f.resourceStatusCount[identifier.GroupKind]
resourceStatusSlice := f.resourceStatuses[identifier.GroupKind]
var resourceStatus status.Status
Expand All @@ -230,9 +293,9 @@ func (f *fakeStatusReader) ReadStatus(_ context.Context, _ ClusterReader, identi
return &event.ResourceStatus{
Identifier: identifier,
Status: resourceStatus,
}
}, nil
}

func (f *fakeStatusReader) ReadStatusForObject(_ context.Context, _ ClusterReader, _ *unstructured.Unstructured) *event.ResourceStatus {
return nil
func (f *fakeStatusReader) ReadStatusForObject(_ context.Context, _ ClusterReader, _ *unstructured.Unstructured) (*event.ResourceStatus, error) {
return nil, nil
}
14 changes: 11 additions & 3 deletions pkg/kstatus/polling/engine/status_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,18 @@ type StatusReader interface {
// ReadStatus will fetch the resource identified by the given identifier
// from the cluster and return an ResourceStatus that will contain
// information about the latest state of the resource, its computed status
// and information about any generated resources.
ReadStatus(ctx context.Context, reader ClusterReader, resource object.ObjMetadata) *event.ResourceStatus
// and information about any generated resources. Errors would usually be
// added to the event.ResourceStatus, but in the case of fatal errors
// that aren't connected to the particular resource, an error can also
// be returned. Currently, only context cancellation and deadline exceeded
// will cause an error to be returned.
ReadStatus(ctx context.Context, reader ClusterReader, resource object.ObjMetadata) (*event.ResourceStatus, error)

// ReadStatusForObject is similar to ReadStatus, but instead of looking up the
// resource based on an identifier, it will use the passed-in resource.
ReadStatusForObject(ctx context.Context, reader ClusterReader, object *unstructured.Unstructured) *event.ResourceStatus
// Errors would usually be added to the event.ResourceStatus, but in the case
// of fatal errors that aren't connected to the particular resource, an error
// can also be returned. Currently, only context cancellation and deadline exceeded
// will cause an error to be returned.
ReadStatusForObject(ctx context.Context, reader ClusterReader, object *unstructured.Unstructured) (*event.ResourceStatus, error)
}
Loading

0 comments on commit 3f86e78

Please sign in to comment.