From 5c3a83585280d4c8b7781748febef7429262af32 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 14 Oct 2022 16:39:58 -0400 Subject: [PATCH] changefeedccl: Ensure correct file ordering. When async flushing enabled, the following sequence of events is possible (even if very unlikely): * k@t1 is emitted, causing async flush to write file f1 * k@t2 is emitted, causing async flush to write file f2 * f2 is written out before f1. In this unlikely scenario -- and the reason why it's unlikely is that we have to generate megabytes of data to cause a flush, unless, file_size parameter was pathologically small -- if a client were to read the contents of the directory right after step 2, and then read directory after step 3, the client will first observe k@t2, and then observe k@t1 -- which is a violation of ordering guarantees. This PR fixes this issue by adopting a queue so that if there is a pending flush in flight, the next flush is queued behind. It is possible that this simple approach may result in throughput decrease. This situation will occur if the underlying storage (s3 or gcp) cannot keep up with writing out data before the next file comes in. However, at this point, such situation is unlikely for two reasons: one, the rest of changefeed machinery must be able to generate one or more files worth of data before the previous flush completes -- and this task in of itself is not trivial, and two, changefeed must have enough memory allocated to it so that pushback mechanism does not trigger. The above assumption was validated in reasonably sized test -- i.e. the non-zero queue depth was never observed. Nonetheless, this PR also adds a log message which may be helpful to detect the situation when the sink might not keep up with the incoming data rate. A more complex solution -- for example, allow unlimited inflight requests during backfill -- may be revisited later, if the above assumption proven incorrect. Fixes #89683 Release note: None. --- pkg/ccl/changefeedccl/sink_cloudstorage.go | 134 ++++++++++++++---- .../changefeedccl/sink_cloudstorage_test.go | 4 +- 2 files changed, 107 insertions(+), 31 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 3b4eb5acbd47..ce10891f439e 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -18,7 +18,6 @@ import ( "net/url" "path/filepath" "strings" - "sync" "sync/atomic" "time" @@ -31,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -307,9 +307,16 @@ type cloudStorageSink struct { metrics metricsRecorder asyncFlushActive bool - flushCtx context.Context - flushGroup sync.WaitGroup - flushErr atomic.Value + flushGroup ctxgroup.Group + asyncFlushCh chan flushRequest // channel for submitting flush requests. + asyncFlushTermCh chan struct{} // channel closed by async flusher to indicate an error + asyncFlushErr error // set by async flusher, prior to closing asyncFlushTermCh +} + +type flushRequest struct { + file *cloudStorageSinkFile + dest string + flush chan struct{} } var cloudStorageSinkIDAtomic int64 @@ -325,6 +332,8 @@ var partitionDateFormats = map[string]string{ } var defaultPartitionFormat = partitionDateFormats["daily"] +const flushQueueDepth = 256 + func makeCloudStorageSink( ctx context.Context, u sinkURL, @@ -372,8 +381,11 @@ func makeCloudStorageSink( topicNamer: tn, asyncFlushActive: enableAsyncFlush.Get(&settings.SV), // TODO (yevgeniy): Consider adding ctx to Dial method instead. - flushCtx: ctx, + flushGroup: ctxgroup.WithContext(ctx), + asyncFlushCh: make(chan flushRequest, flushQueueDepth), + asyncFlushTermCh: make(chan struct{}), } + s.flushGroup.GoCtx(s.asyncFlusher) if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" { dateFormat, ok := partitionDateFormats[partitionFormat] @@ -385,8 +397,7 @@ func makeCloudStorageSink( } if s.timestampOracle != nil { - s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) - s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) + s.setDataFileTimestamp() } switch encodingOpts.Format { @@ -520,7 +531,7 @@ func (s *cloudStorageSink) EmitResolvedTimestamp( // Wait for previously issued async flush requests to complete // before we write resolved time stamp file. - if err := s.waitAsyncFlush(); err != nil { + if err := s.waitAsyncFlush(ctx); err != nil { return errors.Wrapf(err, "while emitting resolved timestamp") } @@ -587,13 +598,17 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { return err } s.files.Clear(true /* addNodesToFreeList */) + s.setDataFileTimestamp() + return s.waitAsyncFlush(ctx) +} +func (s *cloudStorageSink) setDataFileTimestamp() { // Record the least resolved timestamp being tracked in the frontier as of this point, // to use for naming files until the next `Flush()`. See comment on cloudStorageSink // for an overview of the naming convention and proof of correctness. - s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) - s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) - return s.waitAsyncFlush() + ts := s.timestampOracle.inclusiveLowerBoundTS() + s.dataFileTs = cloudStorageFormatTime(ts) + s.dataFilePartition = ts.GoTime().Format(s.partitionFormat) } // enableAsyncFlush controls async flushing behavior for this sink. @@ -605,14 +620,28 @@ var enableAsyncFlush = settings.RegisterBoolSetting( ) // waitAsyncFlush waits until all async flushes complete. -func (s *cloudStorageSink) waitAsyncFlush() error { - s.flushGroup.Wait() - if v := s.flushErr.Load(); v != nil { - return v.(error) +func (s *cloudStorageSink) waitAsyncFlush(ctx context.Context) error { + done := make(chan struct{}) + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.asyncFlushTermCh: + return s.asyncFlushErr + case s.asyncFlushCh <- flushRequest{flush: done}: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.asyncFlushTermCh: + return s.asyncFlushErr + case <-done: + return nil } - return nil } +var logQueueDepth = log.Every(30 * time.Second) + // flushFile flushes file to the cloud storage. // file should not be used after flushing. func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { @@ -620,7 +649,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink if s.asyncFlushActive && !asyncFlushEnabled { // Async flush behavior was turned off -- drain any active flush requests // before flushing this file. - if err := s.waitAsyncFlush(); err != nil { + if err := s.waitAsyncFlush(ctx); err != nil { return err } } @@ -648,21 +677,63 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink return file.flushToStorage(ctx, s.es, dest, s.metrics) } - s.flushGroup.Add(1) - go func() { - defer s.flushGroup.Done() - // NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled). - if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil { - log.Errorf(ctx, "error flushing file to storage: %s", err) - // We must use the same type for error we store in flushErr. - s.flushErr.CompareAndSwap(nil, &flushError{error: err}) + // Try to submit flush request, but produce warning message + // if we can't. + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.asyncFlushTermCh: + return s.asyncFlushErr + case s.asyncFlushCh <- flushRequest{file: file, dest: dest}: + return nil + default: + if logQueueDepth.ShouldLog() { + log.Infof(ctx, "changefeed flush queue is full; at least ~%d bytes to flush", + flushQueueDepth*s.targetMaxFileSize) } - }() - return nil + } + + // Queue was full, block until it's not. + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.asyncFlushTermCh: + return s.asyncFlushErr + case s.asyncFlushCh <- flushRequest{file: file, dest: dest}: + return nil + } } -type flushError struct { - error +func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error { + defer close(s.asyncFlushTermCh) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case req, ok := <-s.asyncFlushCh: + if !ok { + return nil // we're done + } + + // handle flush request. + if req.flush != nil { + close(req.flush) + continue + } + + // flush file to storage. + flushDone := s.metrics.recordFlushRequestCallback() + err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics) + flushDone() + + if err != nil { + log.Errorf(ctx, "error flushing file to storage: %s", err) + s.asyncFlushErr = err + return err + } + } + } } // flushToStorage writes out file into external storage into 'dest'. @@ -695,7 +766,10 @@ func (f *cloudStorageSinkFile) flushToStorage( // Close implements the Sink interface. func (s *cloudStorageSink) Close() error { s.files = nil - return errors.CombineErrors(s.waitAsyncFlush(), s.es.Close()) + err := s.waitAsyncFlush(context.Background()) + close(s.asyncFlushCh) // Signal flusher to exit. + err = errors.CombineErrors(err, s.flushGroup.Wait()) + return errors.CombineErrors(err, s.es.Close()) } // Dial implements the Sink interface. diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index e925476068ca..f545dc03853d 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -197,7 +197,9 @@ func TestCloudStorageSink(t *testing.T) { testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { t.Helper() testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { + old := enableAsyncFlush.Get(&settings.SV) enableAsyncFlush.Override(context.Background(), &settings.SV, enable) + defer enableAsyncFlush.Override(context.Background(), &settings.SV, old) testFn(t) }) } @@ -460,7 +462,7 @@ func TestCloudStorageSink(t *testing.T) { }) waitAsyncFlush := func(s Sink) error { - return s.(*cloudStorageSink).waitAsyncFlush() + return s.(*cloudStorageSink).waitAsyncFlush(context.Background()) } testWithAndWithoutAsyncFlushing(t, `bucketing`, func(t *testing.T) { t1 := makeTopic(`t1`)