Skip to content

Commit

Permalink
changefeedccl: Ensure correct file ordering.
Browse files Browse the repository at this point in the history
When async flushing enabled, the following sequence of events is
possible (even if very unlikely):

* k@t1 is emitted, causing async flush to write file f1
* k@t2 is emitted, causing async flush to write file f2
* f2 is written out before f1.

In this unlikely scenario -- and the reason why it's unlikely is that we
have to generate megabytes of data to cause a flush, unless, file_size
parameter was pathologically small -- if a client were to read the
contents of the directory right after step 2, and then read directory
after step 3, the client will first observe k@t2, and then observe k@t1
-- which is a violation of ordering guarantees.

This PR fixes this issue by adopting a queue so that if there
is a pending flush in flight, the next flush is queued behind.

It is possible that this simple approach may result in throughput
decrease.  This situation will occur if the underlying
storage (s3 or gcp) cannot keep up with writing out data before
the next file comes in.  However, at this point, such
situation is unlikely for two reasons: one, the rest of
changefeed machinery must be able to generate one or more files
worth of data before the previous flush completes -- and this
task in of itself is not trivial, and two, changefeed must
have enough memory allocated to it so that pushback mechanism
does not trigger.  The above assumption was validated in reasonably
sized test -- i.e. the non-zero queue depth was never observed.
Nonetheless, this PR also adds a log message which may be helpful
to detect the situation when the sink might not keep up with the
incoming data rate.

A more complex solution -- for example, allow unlimited inflight
requests during backfill -- may be revisited later, if the above
assumption proven incorrect.

Fixes #89683

Release note: None.
  • Loading branch information
Yevgeniy Miretskiy committed Oct 15, 2022
1 parent b955fd6 commit 1f77670
Showing 1 changed file with 77 additions and 22 deletions.
99 changes: 77 additions & 22 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/google/btree"
Expand Down Expand Up @@ -307,9 +308,18 @@ type cloudStorageSink struct {
metrics metricsRecorder

asyncFlushActive bool
flushCtx context.Context
flushGroup sync.WaitGroup
flushErr atomic.Value
flushCtx context.Context
flushQueue struct {
syncutil.Mutex
err error
q []destFile
}
}

type destFile struct {
file *cloudStorageSinkFile
dest string
}

var cloudStorageSinkIDAtomic int64
Expand Down Expand Up @@ -385,8 +395,7 @@ func makeCloudStorageSink(
}

if s.timestampOracle != nil {
s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS())
s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat)
s.setDataFileTimestamp()
}

switch encodingOpts.Format {
Expand Down Expand Up @@ -587,13 +596,17 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
s.setDataFileTimestamp()
return s.waitAsyncFlush()
}

func (s *cloudStorageSink) setDataFileTimestamp() {
// Record the least resolved timestamp being tracked in the frontier as of this point,
// to use for naming files until the next `Flush()`. See comment on cloudStorageSink
// for an overview of the naming convention and proof of correctness.
s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS())
s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat)
return s.waitAsyncFlush()
ts := s.timestampOracle.inclusiveLowerBoundTS()
s.dataFileTs = cloudStorageFormatTime(ts)
s.dataFilePartition = ts.GoTime().Format(s.partitionFormat)
}

// enableAsyncFlush controls async flushing behavior for this sink.
Expand All @@ -607,12 +620,13 @@ var enableAsyncFlush = settings.RegisterBoolSetting(
// waitAsyncFlush waits until all async flushes complete.
func (s *cloudStorageSink) waitAsyncFlush() error {
s.flushGroup.Wait()
if v := s.flushErr.Load(); v != nil {
return v.(error)
}
return nil
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
return s.flushQueue.err
}

var logQueueDepth = log.Every(30 * time.Second)

// flushFile flushes file to the cloud storage.
// file should not be used after flushing.
func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error {
Expand Down Expand Up @@ -648,21 +662,62 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
return file.flushToStorage(ctx, s.es, dest, s.metrics)
}

s.flushGroup.Add(1)
go func() {
defer s.flushGroup.Done()
// NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled).
if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil {
log.Errorf(ctx, "error flushing file to storage: %s", err)
// We must use the same type for error we store in flushErr.
s.flushErr.CompareAndSwap(nil, &flushError{error: err})
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
if s.flushQueue.err != nil {
return s.flushQueue.err
}

s.flushQueue.q = append(s.flushQueue.q, destFile{file: file, dest: dest})
if len(s.flushQueue.q) == 1 {
// Start a new go routine if the flush queue is empty.
s.flushGroup.Add(1)
go s.asyncFlush()
} else {
if logQueueDepth.ShouldLog() {
depth := len(s.flushQueue.q)
log.Infof(ctx, "changefeed flush queue depth %d; ~%d bytes to flush",
depth, int64(depth)*s.targetMaxFileSize)
}
}()
}
return nil
}

type flushError struct {
error
func (s *cloudStorageSink) asyncFlush() {
defer s.flushGroup.Done()

pop := func(err error) (*cloudStorageSinkFile, string, error) {
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
if s.flushQueue.err != nil {
return nil, "", s.flushQueue.err
}
s.flushQueue.err = err

if len(s.flushQueue.q) > 0 {
f := s.flushQueue.q[0]
s.flushQueue.q = s.flushQueue.q[1:]
return f.file, f.dest, s.flushQueue.err
}
s.flushQueue.q = nil // Allow GC to reclaim q.
return nil, "", nil
}

f, dest, err := pop(nil)
if err != nil {
return
}

for f != nil {
err := f.flushToStorage(s.flushCtx, s.es, dest, s.metrics)
if err != nil {
log.Errorf(s.flushCtx, "error flushing file to storage: %s", err)
}
f, dest, err = pop(err)
if err != nil {
return
}
}
}

// flushToStorage writes out file into external storage into 'dest'.
Expand Down

0 comments on commit 1f77670

Please sign in to comment.