diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 3b4eb5acbd47..ce10891f439e 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -18,7 +18,6 @@ import ( "net/url" "path/filepath" "strings" - "sync" "sync/atomic" "time" @@ -31,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -307,9 +307,16 @@ type cloudStorageSink struct { metrics metricsRecorder asyncFlushActive bool - flushCtx context.Context - flushGroup sync.WaitGroup - flushErr atomic.Value + flushGroup ctxgroup.Group + asyncFlushCh chan flushRequest // channel for submitting flush requests. + asyncFlushTermCh chan struct{} // channel closed by async flusher to indicate an error + asyncFlushErr error // set by async flusher, prior to closing asyncFlushTermCh +} + +type flushRequest struct { + file *cloudStorageSinkFile + dest string + flush chan struct{} } var cloudStorageSinkIDAtomic int64 @@ -325,6 +332,8 @@ var partitionDateFormats = map[string]string{ } var defaultPartitionFormat = partitionDateFormats["daily"] +const flushQueueDepth = 256 + func makeCloudStorageSink( ctx context.Context, u sinkURL, @@ -372,8 +381,11 @@ func makeCloudStorageSink( topicNamer: tn, asyncFlushActive: enableAsyncFlush.Get(&settings.SV), // TODO (yevgeniy): Consider adding ctx to Dial method instead. - flushCtx: ctx, + flushGroup: ctxgroup.WithContext(ctx), + asyncFlushCh: make(chan flushRequest, flushQueueDepth), + asyncFlushTermCh: make(chan struct{}), } + s.flushGroup.GoCtx(s.asyncFlusher) if partitionFormat := u.consumeParam(changefeedbase.SinkParamPartitionFormat); partitionFormat != "" { dateFormat, ok := partitionDateFormats[partitionFormat] @@ -385,8 +397,7 @@ func makeCloudStorageSink( } if s.timestampOracle != nil { - s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) - s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) + s.setDataFileTimestamp() } switch encodingOpts.Format { @@ -520,7 +531,7 @@ func (s *cloudStorageSink) EmitResolvedTimestamp( // Wait for previously issued async flush requests to complete // before we write resolved time stamp file. - if err := s.waitAsyncFlush(); err != nil { + if err := s.waitAsyncFlush(ctx); err != nil { return errors.Wrapf(err, "while emitting resolved timestamp") } @@ -587,13 +598,17 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error { return err } s.files.Clear(true /* addNodesToFreeList */) + s.setDataFileTimestamp() + return s.waitAsyncFlush(ctx) +} +func (s *cloudStorageSink) setDataFileTimestamp() { // Record the least resolved timestamp being tracked in the frontier as of this point, // to use for naming files until the next `Flush()`. See comment on cloudStorageSink // for an overview of the naming convention and proof of correctness. - s.dataFileTs = cloudStorageFormatTime(s.timestampOracle.inclusiveLowerBoundTS()) - s.dataFilePartition = s.timestampOracle.inclusiveLowerBoundTS().GoTime().Format(s.partitionFormat) - return s.waitAsyncFlush() + ts := s.timestampOracle.inclusiveLowerBoundTS() + s.dataFileTs = cloudStorageFormatTime(ts) + s.dataFilePartition = ts.GoTime().Format(s.partitionFormat) } // enableAsyncFlush controls async flushing behavior for this sink. @@ -605,14 +620,28 @@ var enableAsyncFlush = settings.RegisterBoolSetting( ) // waitAsyncFlush waits until all async flushes complete. -func (s *cloudStorageSink) waitAsyncFlush() error { - s.flushGroup.Wait() - if v := s.flushErr.Load(); v != nil { - return v.(error) +func (s *cloudStorageSink) waitAsyncFlush(ctx context.Context) error { + done := make(chan struct{}) + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.asyncFlushTermCh: + return s.asyncFlushErr + case s.asyncFlushCh <- flushRequest{flush: done}: + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.asyncFlushTermCh: + return s.asyncFlushErr + case <-done: + return nil } - return nil } +var logQueueDepth = log.Every(30 * time.Second) + // flushFile flushes file to the cloud storage. // file should not be used after flushing. func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSinkFile) error { @@ -620,7 +649,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink if s.asyncFlushActive && !asyncFlushEnabled { // Async flush behavior was turned off -- drain any active flush requests // before flushing this file. - if err := s.waitAsyncFlush(); err != nil { + if err := s.waitAsyncFlush(ctx); err != nil { return err } } @@ -648,21 +677,63 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink return file.flushToStorage(ctx, s.es, dest, s.metrics) } - s.flushGroup.Add(1) - go func() { - defer s.flushGroup.Done() - // NB: must use s.flushCtx; ctx may be short lived (i.e. cancelled). - if err := file.flushToStorage(s.flushCtx, s.es, dest, s.metrics); err != nil { - log.Errorf(ctx, "error flushing file to storage: %s", err) - // We must use the same type for error we store in flushErr. - s.flushErr.CompareAndSwap(nil, &flushError{error: err}) + // Try to submit flush request, but produce warning message + // if we can't. + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.asyncFlushTermCh: + return s.asyncFlushErr + case s.asyncFlushCh <- flushRequest{file: file, dest: dest}: + return nil + default: + if logQueueDepth.ShouldLog() { + log.Infof(ctx, "changefeed flush queue is full; at least ~%d bytes to flush", + flushQueueDepth*s.targetMaxFileSize) } - }() - return nil + } + + // Queue was full, block until it's not. + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.asyncFlushTermCh: + return s.asyncFlushErr + case s.asyncFlushCh <- flushRequest{file: file, dest: dest}: + return nil + } } -type flushError struct { - error +func (s *cloudStorageSink) asyncFlusher(ctx context.Context) error { + defer close(s.asyncFlushTermCh) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case req, ok := <-s.asyncFlushCh: + if !ok { + return nil // we're done + } + + // handle flush request. + if req.flush != nil { + close(req.flush) + continue + } + + // flush file to storage. + flushDone := s.metrics.recordFlushRequestCallback() + err := req.file.flushToStorage(ctx, s.es, req.dest, s.metrics) + flushDone() + + if err != nil { + log.Errorf(ctx, "error flushing file to storage: %s", err) + s.asyncFlushErr = err + return err + } + } + } } // flushToStorage writes out file into external storage into 'dest'. @@ -695,7 +766,10 @@ func (f *cloudStorageSinkFile) flushToStorage( // Close implements the Sink interface. func (s *cloudStorageSink) Close() error { s.files = nil - return errors.CombineErrors(s.waitAsyncFlush(), s.es.Close()) + err := s.waitAsyncFlush(context.Background()) + close(s.asyncFlushCh) // Signal flusher to exit. + err = errors.CombineErrors(err, s.flushGroup.Wait()) + return errors.CombineErrors(err, s.es.Close()) } // Dial implements the Sink interface. diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index e925476068ca..f545dc03853d 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -197,7 +197,9 @@ func TestCloudStorageSink(t *testing.T) { testWithAndWithoutAsyncFlushing := func(t *testing.T, name string, testFn func(*testing.T)) { t.Helper() testutils.RunTrueAndFalse(t, name+"/asyncFlush", func(t *testing.T, enable bool) { + old := enableAsyncFlush.Get(&settings.SV) enableAsyncFlush.Override(context.Background(), &settings.SV, enable) + defer enableAsyncFlush.Override(context.Background(), &settings.SV, old) testFn(t) }) } @@ -460,7 +462,7 @@ func TestCloudStorageSink(t *testing.T) { }) waitAsyncFlush := func(s Sink) error { - return s.(*cloudStorageSink).waitAsyncFlush() + return s.(*cloudStorageSink).waitAsyncFlush(context.Background()) } testWithAndWithoutAsyncFlushing(t, `bucketing`, func(t *testing.T) { t1 := makeTopic(`t1`)