Skip to content

Commit

Permalink
changefeedccl: Ensure correct file ordering.
Browse files Browse the repository at this point in the history
When async flushing enabled, the following sequence of events is
possible (even if very unlikely):

* k@t1 is emitted, causing async flush to write file f1
* k@t2 is emitted, causing async flush to write file f2
* f2 is written out before f1.

In this unlikely scenario -- and the reason why it's unlikely is that we
have to generate megabytes of data to cause a flush, unless, file_size
parameter was pathologically small -- if a client were to read the
contents of the directory right after step 2, and then read directory
after step 3, the client will first observe k@t2, and then observe k@t1
-- which is a violation of ordering guarantees.

This PR fixes this issue by adopting a queue so that if there
is a pending flush in flight, the next flush is queued behind.

It is possible that this simple approach may result in throughput
decrease.  This situation will occur if the underlying
storage (s3 or gcp) cannot keep up with writing out data before
the next file comes in.  However, at this point, such
situation is unlikely for two reasons: one, the rest of
changefeed machinery must be able to generate one or more files
worth of data before the previous flush completes -- and this
task in of itself is not trivial, and two, changefeed must
have enough memory allocated to it so that pushback mechanism
does not trigger.  The above assumption was validated in reasonably
sized test -- i.e. the non-zero queue depth was never observed.
Nonetheless, this PR also adds a log message which may be helpful
to detect the situation when the sink might not keep up with the
incoming data rate.

A more complex solution -- for example, allow unlimited inflight
requests during backfill -- may be revisited later, if the above
assumption proven incorrect.

Fixes cockroachdb#89683

Release note: None.
  • Loading branch information
Yevgeniy Miretskiy committed Oct 31, 2022
1 parent bc90f53 commit 3cf5565
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 31 deletions.
142 changes: 112 additions & 30 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"net/url"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -31,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -310,9 +310,16 @@ type cloudStorageSink struct {
metrics metricsRecorder

asyncFlushActive bool
flushCtx context.Context
flushGroup sync.WaitGroup
flushErr atomic.Value
flushGroup ctxgroup.Group
asyncFlushCh chan flushRequest // channel for submitting flush requests.
asyncFlushTermCh chan struct{} // channel closed by async flusher to indicate an error
asyncFlushErr error // set by async flusher, prior to closing asyncFlushTermCh
}

type flushRequest struct {
file *cloudStorageSinkFile
dest string
flush chan struct{}
}

var cloudStorageSinkIDAtomic int64
Expand All @@ -328,6 +335,16 @@ var partitionDateFormats = map[string]string{
}
var defaultPartitionFormat = partitionDateFormats["daily"]

// flushQueueDepth puts a limit on how many flush requests
// may be outstanding, before we block.
// In reality, we will block much sooner than this limit due
// to blocking buffer memory limits (in its default configuration);
// We just want this setting to be sufficiently large, but not
// so large as to have extremely large flush queues.
// The default of 256, with the default file size of 16MB, gives us
// a queue of 2.5GB of outstanding flush data.
const flushQueueDepth = 256

func makeCloudStorageSink(
ctx context.Context,
u sinkURL,
Expand Down Expand Up @@ -375,8 +392,11 @@ func makeCloudStorageSink(
topicNamer: tn,
asyncFlushActive: enableAsyncFlush.Get(&settings.SV),
// TODO (yevgeniy): Consider adding ctx to Dial method instead.
flushCtx: ctx,
flushGroup: ctxgroup.WithContext(ctx),
asyncFlushCh: make(chan flushRequest, flushQueueDepth),
asyncFlushTermCh: make(chan struct{}),
}
s.flushGroup.GoCtx(s.asyncFlusher)

if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" {
dateFormat, ok := partitionDateFormats[partitionFormat]
Expand All @@ -388,8 +408,7 @@ func makeCloudStorageSink(
}

if s.timestampOracle != nil {
s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS())
s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat)
s.setDataFileTimestamp()
}

switch encodingOpts.Format {
Expand Down Expand Up @@ -523,7 +542,7 @@ func (s *cloudStorageSink) EmitResolvedTimestamp(

// Wait for previously issued async flush requests to complete
// before we write resolved time stamp file.
if err := s.waitAsyncFlush(); err != nil {
if err := s.waitAsyncFlush(ctx); err != nil {
return errors.Wrapf(err, "while emitting resolved timestamp")
}

Expand Down Expand Up @@ -590,13 +609,17 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
return err
}
s.files.Clear(true /* addNodesToFreeList */)
s.setDataFileTimestamp()
return s.waitAsyncFlush(ctx)
}

func (s *cloudStorageSink) setDataFileTimestamp() {
// Record the least resolved timestamp being tracked in the frontier as of this point,
// to use for naming files until the next `Flush()`. See comment on cloudStorageSink
// for an overview of the naming convention and proof of correctness.
s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS())
s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat)
return s.waitAsyncFlush()
ts := s.timestampOracle.inclusiveLowerBoundTS()
s.dataFileTs = cloudStorageFormatTime(ts)
s.dataFilePartition = ts.GoTime().Format(s.partitionFormat)
}

// enableAsyncFlush controls async flushing behavior for this sink.
Expand All @@ -608,22 +631,36 @@ var enableAsyncFlush = settings.RegisterBoolSetting(
)

// waitAsyncFlush waits until all async flushes complete.
func (s *cloudStorageSink) waitAsyncFlush() error {
s.flushGroup.Wait()
if v := s.flushErr.Load(); v != nil {
return v.(error)
func (s *cloudStorageSink) waitAsyncFlush(ctx context.Context) error {
done := make(chan struct{})
select {
case <-ctx.Done():
return ctx.Err()
case <-s.asyncFlushTermCh:
return s.asyncFlushErr
case s.asyncFlushCh <- flushRequest{flush: done}:
}

select {
case <-ctx.Done():
return ctx.Err()
case <-s.asyncFlushTermCh:
return s.asyncFlushErr
case <-done:
return nil
}
return nil
}

var logQueueDepth = log.Every(30 * time.Second)

// flushFile flushes file to the cloud storage.
// file should not be used after flushing.
func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error {
asyncFlushEnabled := enableAsyncFlush.Get(&s.settings.SV)
if s.asyncFlushActive && !asyncFlushEnabled {
// Async flush behavior was turned off -- drain any active flush requests
// before flushing this file.
if err := s.waitAsyncFlush(); err != nil {
if err := s.waitAsyncFlush(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -651,21 +688,63 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
return file.flushToStorage(ctx, s.es, dest, s.metrics)
}

s.flushGroup.Add(1)
go func() {
defer s.flushGroup.Done()
// NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled).
if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil {
log.Errorf(ctx, "error flushing file to storage: %s", err)
// We must use the same type for error we store in flushErr.
s.flushErr.CompareAndSwap(nil, &flushError{error: err})
// Try to submit flush request, but produce warning message
// if we can't.
select {
case <-ctx.Done():
return ctx.Err()
case <-s.asyncFlushTermCh:
return s.asyncFlushErr
case s.asyncFlushCh <- flushRequest{file: file, dest: dest}:
return nil
default:
if logQueueDepth.ShouldLog() {
log.Infof(ctx, "changefeed flush queue is full; ~%d bytes to flush",
flushQueueDepth*s.targetMaxFileSize)
}
}()
return nil
}

// Queue was full, block until it's not.
select {
case <-ctx.Done():
return ctx.Err()
case <-s.asyncFlushTermCh:
return s.asyncFlushErr
case s.asyncFlushCh <- flushRequest{file: file, dest: dest}:
return nil
}
}

type flushError struct {
error
func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error {
defer close(s.asyncFlushTermCh)

for {
select {
case <-ctx.Done():
return ctx.Err()
case req, ok := <-s.asyncFlushCh:
if !ok {
return nil // we're done
}

// handle flush request.
if req.flush != nil {
close(req.flush)
continue
}

// flush file to storage.
flushDone := s.metrics.recordFlushRequestCallback()
err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics)
flushDone()

if err != nil {
log.Errorf(ctx, "error flushing file to storage: %s", err)
s.asyncFlushErr = err
return err
}
}
}
}

// flushToStorage writes out file into external storage into 'dest'.
Expand Down Expand Up @@ -698,7 +777,10 @@ func (f *cloudStorageSinkFile) flushToStorage(
// Close implements the Sink interface.
func (s *cloudStorageSink) Close() error {
s.files = nil
return errors.CombineErrors(s.waitAsyncFlush(), s.es.Close())
err := s.waitAsyncFlush(context.Background())
close(s.asyncFlushCh) // signal flusher to exit.
err = errors.CombineErrors(err, s.flushGroup.Wait())
return errors.CombineErrors(err, s.es.Close())
}

// Dial implements the Sink interface.
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ func TestCloudStorageSink(t *testing.T) {
testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) {
t.Helper()
testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) {
old := enableAsyncFlush.Get(&settings.SV)
enableAsyncFlush.Override(context.Background(), &settings.SV, enable)
defer enableAsyncFlush.Override(context.Background(), &settings.SV, old)
testFn(t)
})
}
Expand Down Expand Up @@ -460,7 +462,7 @@ func TestCloudStorageSink(t *testing.T) {
})

waitAsyncFlush := func(s Sink) error {
return s.(*cloudStorageSink).waitAsyncFlush()
return s.(*cloudStorageSink).waitAsyncFlush(context.Background())
}
testWithAndWithoutAsyncFlushing(t, `bucketing`, func(t *testing.T) {
t1 := makeTopic(`t1`)
Expand Down

0 comments on commit 3cf5565

Please sign in to comment.