From 05b69a4788cd0a61747323befc7d4985eaea9116 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Fri, 14 Oct 2022 16:39:58 -0400 Subject: [PATCH] changefeedccl: Ensure correct file ordering. When async flushing enabled, the following sequence of events is possible (even if very unlikely): * k@t1 is emitted, causing async flush to write file f1 * k@t2 is emitted, causing async flush to write file f2 * f2 is written out before f1. In this unlikely scenario -- and the reason why it's unlikely is that we have to generate megabytes of data to cause a flush, unless, file_size parameter was pathologically small -- if a client were to read the contents of the directory right after step 2, and then read directory after step 3, the client will first observe k@t2, and then observe k@t1 -- which is a violation of ordering guarantees. This PR fixes this issue by adopting a queue so that if there is a pending flush in flight, the next flush is queued behind. It is possible that this simple approach may result in throughput decrease. This situation will occur if the underlying storage (s3 or gcp) cannot keep up with writing out data before the next file comes in. However, at this point, such situation is unlikely for two reasons: one, the rest of changefeed machinery must be able to generate one or more files worth of data before the previous flush completes -- and this task in of itself is not trivial, and two, changefeed must have enough memory allocated to it so that pushback mechanism does not trigger. The above assumption was validated in reasonably sized test -- i.e. the non-zero queue depth was never observed. Nonetheless, this PR also adds a log message which may be helpful to detect the situation when the sink might not keep up with the incoming data rate. A more complex solution -- for example, allow unlimited inflight requests during backfill -- may be revisited later, if the above assumption proven incorrect. Fixes #89683 Release note: None. --- pkg/ccl/changefeedccl/sink_cloudstorage.go | 113 ++++++++++++++---- .../changefeedccl/sink_cloudstorage_test.go | 2 + 2 files changed, 93 insertions(+), 22 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 3b4eb5acbd47..63ab82052f00 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/google/btree" @@ -308,8 +309,19 @@ type cloudStorageSink struct { asyncFlushActive bool flushCtx context.Context - flushGroup sync.WaitGroup - flushErr atomic.Value + flushQueue struct { + syncutil.Mutex + err error + q []destFile + numGoRoutines int + // Signaled whenever changes are made to flushQueue struct. + flushC sync.Cond + } +} + +type destFile struct { + file *cloudStorageSinkFile + dest string } var cloudStorageSinkIDAtomic int64 @@ -374,6 +386,7 @@ func makeCloudStorageSink( // TODO (yevgeniy): Consider adding ctx to Dial method instead. flushCtx: ctx, } + s.flushQueue.flushC = sync.Cond{L: &s.flushQueue.Mutex} if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" { dateFormat, ok := partitionDateFormats[partitionFormat] @@ -385,8 +398,7 @@ func makeCloudStorageSink( } if s.timestampOracle != nil { - s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) - s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) + s.setDataFileTimestamp() } switch encodingOpts.Format { @@ -587,13 +599,17 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { return err } s.files.Clear(true /* addNodesToFreeList */) + s.setDataFileTimestamp() + return s.waitAsyncFlush() +} +func (s *cloudStorageSink) setDataFileTimestamp() { // Record the least resolved timestamp being tracked in the frontier as of this point, // to use for naming files until the next `Flush()`. See comment on cloudStorageSink // for an overview of the naming convention and proof of correctness. - s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) - s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) - return s.waitAsyncFlush() + ts := s.timestampOracle.inclusiveLowerBoundTS() + s.dataFileTs = cloudStorageFormatTime(ts) + s.dataFilePartition = ts.GoTime().Format(s.partitionFormat) } // enableAsyncFlush controls async flushing behavior for this sink. @@ -606,13 +622,16 @@ var enableAsyncFlush = settings.RegisterBoolSetting( // waitAsyncFlush waits until all async flushes complete. func (s *cloudStorageSink) waitAsyncFlush() error { - s.flushGroup.Wait() - if v := s.flushErr.Load(); v != nil { - return v.(error) + s.flushQueue.Lock() + defer s.flushQueue.Unlock() + for s.flushQueue.numGoRoutines > 0 || (len(s.flushQueue.q) > 0 && s.flushQueue.err == nil) { + s.flushQueue.flushC.Wait() } - return nil + return s.flushQueue.err } +var logQueueDepth = log.Every(30 * time.Second) + // flushFile flushes file to the cloud storage. // file should not be used after flushing. func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { @@ -648,21 +667,71 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink return file.flushToStorage(ctx, s.es, dest, s.metrics) } - s.flushGroup.Add(1) - go func() { - defer s.flushGroup.Done() - // NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled). - if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil { - log.Errorf(ctx, "error flushing file to storage: %s", err) - // We must use the same type for error we store in flushErr. - s.flushErr.CompareAndSwap(nil, &flushError{error: err}) + s.flushQueue.Lock() + defer s.flushQueue.Unlock() + if s.flushQueue.err != nil { + return s.flushQueue.err + } + + s.flushQueue.q = append(s.flushQueue.q, destFile{file: file, dest: dest}) + if len(s.flushQueue.q) == 1 { + // Start a new go routine if the flush queue was empty. + s.flushQueue.numGoRoutines++ + go s.asyncFlush() + } else { + if logQueueDepth.ShouldLog() { + depth := len(s.flushQueue.q) + log.Infof(ctx, "changefeed flush queue depth %d; ~%d bytes to flush", + depth, int64(depth)*s.targetMaxFileSize) } - }() + } return nil } -type flushError struct { - error +func (s *cloudStorageSink) asyncFlush() { + defer func() { + s.flushQueue.Lock() + defer s.flushQueue.Unlock() + defer s.flushQueue.flushC.Signal() + s.flushQueue.numGoRoutines-- + }() + + pop := func(err error) (*cloudStorageSinkFile, string, error) { + s.flushQueue.Lock() + defer s.flushQueue.Unlock() + defer s.flushQueue.flushC.Signal() + + if s.flushQueue.err != nil { + return nil, "", s.flushQueue.err + } + s.flushQueue.err = err + + if len(s.flushQueue.q) > 0 { + f := s.flushQueue.q[0] + s.flushQueue.q = s.flushQueue.q[1:] + return f.file, f.dest, s.flushQueue.err + } + s.flushQueue.q = nil // Allow GC to reclaim q. + return nil, "", nil + } + + f, dest, err := pop(nil) + if err != nil { + return + } + + for f != nil { + flushDone := s.metrics.recordFlushRequestCallback() + err := f.flushToStorage(s.flushCtx, s.es, dest, s.metrics) + flushDone() + if err != nil { + log.Errorf(s.flushCtx, "error flushing file to storage: %s", err) + } + f, dest, err = pop(err) + if err != nil { + return + } + } } // flushToStorage writes out file into external storage into 'dest'. diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index e925476068ca..b5e236f91073 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -197,7 +197,9 @@ func TestCloudStorageSink(t *testing.T) { testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { t.Helper() testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { + old := enableAsyncFlush.Get(&settings.SV) enableAsyncFlush.Override(context.Background(), &settings.SV, enable) + defer enableAsyncFlush.Override(context.Background(), &settings.SV, old) testFn(t) }) }