Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
tareksha authored and Tarek Sharafi committed Jan 22, 2025
1 parent 3929238 commit c4ede82
Showing 1 changed file with 76 additions and 0 deletions.
76 changes: 76 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

type TestRequest struct {
Key string
}

var _ = Describe("controller", func() {
var fakeReconcile *fakeReconciler
var ctrl *Controller[reconcile.Request]
Expand Down Expand Up @@ -340,6 +344,41 @@ var _ = Describe("controller", func() {
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
})

It("should check for correct TypedSyncingSource if custom types are used", func() {
queue := &controllertest.TypedQueue[TestRequest]{
TypedInterface: workqueue.NewTyped[TestRequest](),
}
ctrl := &Controller[TestRequest]{
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
return queue
},
LogConstructor: func(*TestRequest) logr.Logger {
return log.RuntimeLog.WithName("controller").WithName("test")
},
}
ctrl.CacheSyncTimeout = time.Second
src := &bisignallingSource[TestRequest]{
startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]),
startDone: make(chan error, 1),
waitCall: make(chan struct{}),
waitDone: make(chan error, 1),
}
ctrl.startWatches = []source.TypedSource[TestRequest]{src}
ctrl.Name = "foo"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startCh := make(chan error)
go func() {
defer GinkgoRecover()
startCh <- ctrl.Start(ctx)
}()
Eventually(src.startCall).Should(Receive(Equal(queue)))
src.startDone <- nil
Eventually(src.waitCall).Should(BeClosed())
src.waitDone <- nil
cancel()
Eventually(startCh).Should(Receive(Succeed()))
})
})

Describe("Processing queue items from a Controller", func() {
Expand Down Expand Up @@ -901,3 +940,40 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte
<-ctx.Done()
return nil, errors.New("GetInformer timed out")
}

type bisignallingSource[T comparable] struct {
// receives the queue that is passed to Start
startCall chan workqueue.TypedRateLimitingInterface[T]
// passes an error to return from Start
startDone chan error
// closed when WaitForSync is called
waitCall chan struct{}
// passes an error to return from WaitForSync
waitDone chan error
}

var _ source.TypedSyncingSource[int] = (*bisignallingSource[int])(nil)

func (t *bisignallingSource[T]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[T]) error {
select {
case t.startCall <- q:
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-t.startDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error {
close(t.waitCall)
select {
case err := <-t.waitDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}

0 comments on commit c4ede82

Please sign in to comment.