Skip to content

Commit

Permalink
🐛fix(controller): support WaitForSync in custom TypedSyncingSource (#…
Browse files Browse the repository at this point in the history
…3084)

* 🐛fix(controller): use generic WaitForSync

There is already support for defining `TypedSyncingSource` but the original code still checks for the original `SyncingSource` before callign `WaitForSync(ctx)` which does not work for custom typed controller.

this fix should be backported to v0.19

* test
  • Loading branch information
tareksha authored and Tarek Sharafi committed Jan 23, 2025
1 parent d4df90f commit 8017666
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
c.LogConstructor(nil).Info("Starting Controller")

for _, watch := range c.startWatches {
syncingSource, ok := watch.(source.SyncingSource)
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
continue
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/internal/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ var _ = BeforeSuite(func() {

testenv = &envtest.Environment{}

var err error
cfg, err = testenv.Start()
Expect(err).NotTo(HaveOccurred())
// var err error
// cfg, err = testenv.Start()
// Expect(err).NotTo(HaveOccurred())

clientset, err = kubernetes.NewForConfig(cfg)
Expect(err).NotTo(HaveOccurred())
// clientset, err = kubernetes.NewForConfig(cfg)
// Expect(err).NotTo(HaveOccurred())
})

var _ = AfterSuite(func() {
Expand Down
76 changes: 76 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

type TestRequest struct {
Key string
}

var _ = Describe("controller", func() {
var fakeReconcile *fakeReconciler
var ctrl *Controller[reconcile.Request]
Expand Down Expand Up @@ -323,6 +327,41 @@ var _ = Describe("controller", func() {
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
})

It("should check for correct TypedSyncingSource if custom types are used", func() {
queue := &controllertest.TypedQueue[TestRequest]{
TypedInterface: workqueue.NewTyped[TestRequest](),
}
ctrl := &Controller[TestRequest]{
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
return queue
},
LogConstructor: func(*TestRequest) logr.Logger {
return log.RuntimeLog.WithName("controller").WithName("test")
},
}
ctrl.CacheSyncTimeout = time.Second
src := &bisignallingSource[TestRequest]{
startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]),
startDone: make(chan error, 1),
waitCall: make(chan struct{}),
waitDone: make(chan error, 1),
}
ctrl.startWatches = []source.TypedSource[TestRequest]{src}
ctrl.Name = "foo"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startCh := make(chan error)
go func() {
defer GinkgoRecover()
startCh <- ctrl.Start(ctx)
}()
Eventually(src.startCall).Should(Receive(Equal(queue)))
src.startDone <- nil
Eventually(src.waitCall).Should(BeClosed())
src.waitDone <- nil
cancel()
Eventually(startCh).Should(Receive(Succeed()))
})
})

Describe("Processing queue items from a Controller", func() {
Expand Down Expand Up @@ -875,3 +914,40 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte
<-ctx.Done()
return nil, errors.New("GetInformer timed out")
}

type bisignallingSource[T comparable] struct {
// receives the queue that is passed to Start
startCall chan workqueue.TypedRateLimitingInterface[T]
// passes an error to return from Start
startDone chan error
// closed when WaitForSync is called
waitCall chan struct{}
// passes an error to return from WaitForSync
waitDone chan error
}

var _ source.TypedSyncingSource[int] = (*bisignallingSource[int])(nil)

func (t *bisignallingSource[T]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[T]) error {
select {
case t.startCall <- q:
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-t.startDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error {
close(t.waitCall)
select {
case err := <-t.waitDone:
return err
case <-ctx.Done():
return ctx.Err()
}
}

0 comments on commit 8017666

Please sign in to comment.