Skip to content

Commit

Permalink
changefeedccl: Ensure correct file ordering.
Browse files Browse the repository at this point in the history
When async flushing enabled, the following sequence of events is
possible (even if very unlikely):

* k@t1 is emitted, causing async flush to write file f1
* k@t2 is emitted, causing async flush to write file f2
* f2 is written out before f1.

In this unlikely scenario -- and the reason why it's unlikely is that we
have to generate megabytes of data to cause a flush, unless, file_size
parameter was pathologically small -- if a client were to read the
contents of the directory right after step 2, and then read directory
after step 3, the client will first observe k@t2, and then observe k@t1
-- which is a violation of ordering guarantees.

This PR fixes this issue by adopting a queue so that if there
is a pending flush in flight, the next flush is queued behind.

It is possible that this simple approach may result in throughput
decrease.  This situation will occur if the underlying
storage (s3 or gcp) cannot keep up with writing out data before
the next file comes in.  However, at this point, such
situation is unlikely for two reasons: one, the rest of
changefeed machinery must be able to generate one or more files
worth of data before the previous flush completes -- and this
task in of itself is not trivial, and two, changefeed must
have enough memory allocated to it so that pushback mechanism
does not trigger.  The above assumption was validated in reasonably
sized test -- i.e. the non-zero queue depth was never observed.
Nonetheless, this PR also adds a log message which may be helpful
to detect the situation when the sink might not keep up with the
incoming data rate.

A more complex solution -- for example, allow unlimited inflight
requests during backfill -- may be revisited later, if the above
assumption proven incorrect.

Fixes cockroachdb#89683

Release note: None.
  • Loading branch information
Yevgeniy Miretskiy committed Oct 18, 2022
1 parent b955fd6 commit 05b69a4
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 22 deletions.
113 changes: 91 additions & 22 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/google/btree"
Expand Down Expand Up @@ -308,8 +309,19 @@ type cloudStorageSink struct {

asyncFlushActive bool
flushCtx context.Context
flushGroup sync.WaitGroup
flushErr atomic.Value
flushQueue struct {
syncutil.Mutex
err error
q []destFile
numGoRoutines int
// Signaled whenever changes are made to flushQueue struct.
flushC sync.Cond
}
}

type destFile struct {
file *cloudStorageSinkFile
dest string
}

var cloudStorageSinkIDAtomic int64
Expand Down Expand Up @@ -374,6 +386,7 @@ func makeCloudStorageSink(
// TODO (yevgeniy): Consider adding ctx to Dial method instead.
flushCtx: ctx,
}
s.flushQueue.flushC = sync.Cond{L: &s.flushQueue.Mutex}

if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" {
dateFormat, ok := partitionDateFormats[partitionFormat]
Expand All @@ -385,8 +398,7 @@ func makeCloudStorageSink(
}

if s.timestampOracle != nil {
s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS())
s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat)
s.setDataFileTimestamp()
}

switch encodingOpts.Format {
Expand Down Expand Up @@ -587,13 +599,17 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
s.setDataFileTimestamp()
return s.waitAsyncFlush()
}

func (s *cloudStorageSink) setDataFileTimestamp() {
// Record the least resolved timestamp being tracked in the frontier as of this point,
// to use for naming files until the next `Flush()`. See comment on cloudStorageSink
// for an overview of the naming convention and proof of correctness.
s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS())
s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat)
return s.waitAsyncFlush()
ts := s.timestampOracle.inclusiveLowerBoundTS()
s.dataFileTs = cloudStorageFormatTime(ts)
s.dataFilePartition = ts.GoTime().Format(s.partitionFormat)
}

// enableAsyncFlush controls async flushing behavior for this sink.
Expand All @@ -606,13 +622,16 @@ var enableAsyncFlush = settings.RegisterBoolSetting(

// waitAsyncFlush waits until all async flushes complete.
func (s *cloudStorageSink) waitAsyncFlush() error {
s.flushGroup.Wait()
if v := s.flushErr.Load(); v != nil {
return v.(error)
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
for s.flushQueue.numGoRoutines > 0 || (len(s.flushQueue.q) > 0 && s.flushQueue.err == nil) {
s.flushQueue.flushC.Wait()
}
return nil
return s.flushQueue.err
}

var logQueueDepth = log.Every(30 * time.Second)

// flushFile flushes file to the cloud storage.
// file should not be used after flushing.
func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error {
Expand Down Expand Up @@ -648,21 +667,71 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
return file.flushToStorage(ctx, s.es, dest, s.metrics)
}

s.flushGroup.Add(1)
go func() {
defer s.flushGroup.Done()
// NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled).
if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil {
log.Errorf(ctx, "error flushing file to storage: %s", err)
// We must use the same type for error we store in flushErr.
s.flushErr.CompareAndSwap(nil, &flushError{error: err})
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
if s.flushQueue.err != nil {
return s.flushQueue.err
}

s.flushQueue.q = append(s.flushQueue.q, destFile{file: file, dest: dest})
if len(s.flushQueue.q) == 1 {
// Start a new go routine if the flush queue was empty.
s.flushQueue.numGoRoutines++
go s.asyncFlush()
} else {
if logQueueDepth.ShouldLog() {
depth := len(s.flushQueue.q)
log.Infof(ctx, "changefeed flush queue depth %d; ~%d bytes to flush",
depth, int64(depth)*s.targetMaxFileSize)
}
}()
}
return nil
}

type flushError struct {
error
func (s *cloudStorageSink) asyncFlush() {
defer func() {
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
defer s.flushQueue.flushC.Signal()
s.flushQueue.numGoRoutines--
}()

pop := func(err error) (*cloudStorageSinkFile, string, error) {
s.flushQueue.Lock()
defer s.flushQueue.Unlock()
defer s.flushQueue.flushC.Signal()

if s.flushQueue.err != nil {
return nil, "", s.flushQueue.err
}
s.flushQueue.err = err

if len(s.flushQueue.q) > 0 {
f := s.flushQueue.q[0]
s.flushQueue.q = s.flushQueue.q[1:]
return f.file, f.dest, s.flushQueue.err
}
s.flushQueue.q = nil // Allow GC to reclaim q.
return nil, "", nil
}

f, dest, err := pop(nil)
if err != nil {
return
}

for f != nil {
flushDone := s.metrics.recordFlushRequestCallback()
err := f.flushToStorage(s.flushCtx, s.es, dest, s.metrics)
flushDone()
if err != nil {
log.Errorf(s.flushCtx, "error flushing file to storage: %s", err)
}
f, dest, err = pop(err)
if err != nil {
return
}
}
}

// flushToStorage writes out file into external storage into 'dest'.
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ func TestCloudStorageSink(t *testing.T) {
testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) {
t.Helper()
testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) {
old := enableAsyncFlush.Get(&settings.SV)
enableAsyncFlush.Override(context.Background(), &settings.SV, enable)
defer enableAsyncFlush.Override(context.Background(), &settings.SV, old)
testFn(t)
})
}
Expand Down

0 comments on commit 05b69a4

Please sign in to comment.