From ab91e9dd41cfe983bee05971d9f9c7907c23cd8d Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Tue, 1 Oct 2024 20:38:56 +0200 Subject: [PATCH] ingest storage: proper shutdown of partitionCommitter (#9436) This `partitionCommitter` would be shut down via the services manager as soon as the service context is cancelled. This means that they shut down in parallel with the `PartitionReader`. The race comes when the `partitionCommitter` has already shut down while the `PartitionReader` is still processing some records. Then when the `PartitionReader` tries to `enqueueCommit`, that sets the atomic, but does not send this to Kafka. As a result we may not always persist the latest commit to Kafka. Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/reader.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index b6744557f66..6c148944077 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -177,7 +177,9 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) { if err != nil { return errors.Wrap(err, "creating service manager") } - err = services.StartManagerAndAwaitHealthy(ctx, r.dependencies) + // Use context.Background() because we want to stop all dependencies when the PartitionReader stops + // instead of stopping them when ctx is cancelled and while the PartitionReader is still running. + err = services.StartManagerAndAwaitHealthy(context.Background(), r.dependencies) if err != nil { return errors.Wrap(err, "starting service manager") }