Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use unique context for endpoint poller reconcile actions #489

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions internal/controller/bindings/boundendpoint_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,23 @@ func (r *BoundEndpointPoller) reconcileBoundEndpointsFromAPI(ctx context.Context
toCreate, toUpdate, toDelete := r.filterBoundEndpointActions(ctx, existingBoundEndpoints, desiredBoundEndpoints)

// create context + errgroup for managing/closing the future goroutine in the reconcile actions loops
errGroup, ctx := errgroup.WithContext(ctx)
ctx, cancel := context.WithCancel(ctx)
errGroup, reconcileActionCtx := errgroup.WithContext(context.Background())
reconcileActionCtx, cancel := context.WithCancel(reconcileActionCtx)
reconcileActionCtx = ctrl.LoggerInto(reconcileActionCtx, log)
r.reconcilingCancel = cancel

// launch goroutines to reconcile the BoundEndpoints' actions in the background until the next polling loop

r.reconcileBoundEndpointAction(ctx, errGroup, toCreate, "create", func(ctx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.createBinding(ctx, binding)
r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toCreate, "create", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.createBinding(reconcileActionCtx, binding)
})

r.reconcileBoundEndpointAction(ctx, errGroup, toUpdate, "update", func(ctx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.updateBinding(ctx, binding)
r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toUpdate, "update", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.updateBinding(reconcileActionCtx, binding)
})

r.reconcileBoundEndpointAction(ctx, errGroup, toDelete, "delete", func(ctx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.deleteBinding(ctx, binding)
r.reconcileBoundEndpointAction(reconcileActionCtx, errGroup, toDelete, "delete", func(reconcileActionCtx context.Context, binding bindingsv1alpha1.BoundEndpoint) error {
return r.deleteBinding(reconcileActionCtx, binding)
})

return nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question Do we need an errGroup.Wait() here or are we just using the errgroup to spawn all of them at the same time and relying on the context being canceled? If the latter, it might be clearer if we drop the errgroup and just use goroutines for concurrent processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I think you're right - I'm mixing up using errGroup and managing the ctx cancel myself. I'll clean this up.

Expand All @@ -245,6 +246,11 @@ type boundEndpointActionFn func(context.Context, bindingsv1alpha1.BoundEndpoint)
func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context, errGroup *errgroup.Group, boundEndpoints []bindingsv1alpha1.BoundEndpoint, actionMsg string, action boundEndpointActionFn) {
log := ctrl.LoggerFrom(ctx)

if len(boundEndpoints) == 0 {
// nothing to do
return
}

errGroup.Go(func() error {
// attempt reconciliation actions every so often
ticker := time.NewTicker(5 * time.Second)
Expand All @@ -254,16 +260,17 @@ func (r *BoundEndpointPoller) reconcileBoundEndpointAction(ctx context.Context,
remainingBindings := boundEndpoints

for {
if len(remainingBindings) == 0 {
return nil // all bindings have been processed
}

select {
// stop go routine and return, there is a new reconcile poll happening actively
case <-ctx.Done():
log.Error(ctx.Err(), "Reconcile context canceled, stopping BoundEndpoint reconcile loop early", "action", actionMsg)
log.Info("Reconcile Action context canceled, stopping BoundEndpoint reconcile action loop early", "action", actionMsg)
return nil
case <-ticker.C:
log.V(9).Info("Received tick", "action", actionMsg, "remaining", remainingBindings)
if len(remainingBindings) == 0 {
return nil // all bindings have been processed
}

failedBindings := []bindingsv1alpha1.BoundEndpoint{}

Expand Down
Loading