From 4cb3024f7878ad0a3ca6e86cd44a80a72674ff0f Mon Sep 17 00:00:00 2001 From: Dakota Paasman <122491662+dpaasman00@users.noreply.github.com> Date: Wed, 29 Jan 2025 09:03:46 -0500 Subject: [PATCH] remove closing of done chan in shutdown --- receiver/awss3rehydrationreceiver/reciever.go | 6 +++++- receiver/azureblobrehydrationreceiver/receiver.go | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/receiver/awss3rehydrationreceiver/reciever.go b/receiver/awss3rehydrationreceiver/reciever.go index 8d72e059e..43eff2c8d 100644 --- a/receiver/awss3rehydrationreceiver/reciever.go +++ b/receiver/awss3rehydrationreceiver/reciever.go @@ -158,7 +158,6 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { if r.cancelFunc != nil { r.cancelFunc() } - close(r.doneChan) // wait for any in-progress object rehydrations to finish done := make(chan struct{}) @@ -180,9 +179,14 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { if err := r.handleCheckpoint(ctx); err != nil { errs = errors.Join(errs, fmt.Errorf("handle checkpoint: %w", err)) } + + r.checkpointMutex.Lock() + defer r.checkpointMutex.Unlock() + if err := r.checkpointStore.Close(ctx); err != nil { errs = errors.Join(errs, fmt.Errorf("close checkpoint store: %w", err)) } + r.logger.Info("Shutdown complete") return errs } diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index 39584344d..f4ea90706 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -174,9 +174,6 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - // signal shutdown intent - close(r.doneChan) - // wait for any in-progress operations to finish done := make(chan struct{}) go func() { @@ -195,10 +192,13 @@ func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { errs = errors.Join(errs, fmt.Errorf("error while saving checkpoint: %w", err)) } + r.mut.Lock() + defer r.mut.Unlock() + if err := r.checkpointStore.Close(shutdownCtx); err != nil { errs = errors.Join(errs, fmt.Errorf("error while closing checkpoint store: %w", err)) } - + r.logger.Info("Shutdown complete") return errs }