Skip to content

Commit

Permalink
backupccl: more controlled shutdown during job cancellation
Browse files Browse the repository at this point in the history
Previously, we passed the import Resumer's context directly to our
DistSQLReceiver and to (*sql.DistSQLPlanner).Run. This context is
canceled when the user cancels or pauses a job. In practice, this
setup made it very common for dsp.Run to return before the processors
have shut down.

Here, we create a separate context for the distsql flow. When the
Resumer's context is canceled, we SetError on the DistSQLReceiver
which will transition the receiver to DrainRequested which will be
propagated to remote processors. Eventually, this leads to all remote
processors exiting, and the entire flow shutting down.

Note that the propagation of the draining status happens when a
message is actually pushed from processor. We push progress messages
from the import processors to the distsql receivers every 10 seconds
or so.

To protect against waiting too long, we explicitly cancel the flow
after a timeout.

Further, previously un-managed goroutines in the import processors are
now explicitly managed in a context group that we wait on during
shutdown, similar to other job-related processors.

Overall, in the included test, this substantially reduces the
frequency at which we see import processors outliving the running job.

Release note: None
  • Loading branch information
stevendanna committed Nov 14, 2022
1 parent a8b0cd9 commit 87cbbe1
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 25 deletions.
1 change: 1 addition & 0 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
type importTestingKnobs struct {
afterImport func(summary roachpb.RowCount) error
beforeRunDSP func() error
onSetupFinish func()
alwaysFlushJobProgress bool
}

Expand Down
65 changes: 48 additions & 17 deletions pkg/sql/importer/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var csvOutputTypes = []*types.T{

const readImportDataProcessorName = "readImportDataProcessor"

var progressUpdateInterval = time.Second * 10

var importPKAdderBufferSize = func() *settings.ByteSizeSetting {
s := settings.RegisterByteSizeSetting(
settings.TenantWritable,
Expand Down Expand Up @@ -123,6 +125,8 @@ type readImportDataProcessor struct {
spec execinfrapb.ReadImportDataSpec
output execinfra.RowReceiver

cancel context.CancelFunc
wg ctxgroup.Group
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress

seqChunkProvider *row.SeqChunkProvider
Expand All @@ -144,45 +148,52 @@ func newReadImportDataProcessor(
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
cp := &readImportDataProcessor{
idp := &readImportDataProcessor{
flowCtx: flowCtx,
spec: spec,
output: output,
progCh: make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress),
}
if err := cp.Init(ctx, cp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
if err := idp.Init(ctx, idp, post, csvOutputTypes, flowCtx, processorID, output, nil, /* memMonitor */
execinfra.ProcStateOpts{
// This processor doesn't have any inputs to drain.
InputsToDrain: nil,
TrailingMetaCallback: func() []execinfrapb.ProducerMetadata {
idp.close()
return nil
},
}); err != nil {
return nil, err
}

// Load the import job running the import in case any of the columns have a
// default expression which uses sequences. In this case we need to update the
// job progress within the import processor.
if cp.flowCtx.Cfg.JobRegistry != nil {
cp.seqChunkProvider = &row.SeqChunkProvider{
JobID: cp.spec.Progress.JobID,
Registry: cp.flowCtx.Cfg.JobRegistry,
DB: cp.flowCtx.Cfg.DB,
if idp.flowCtx.Cfg.JobRegistry != nil {
idp.seqChunkProvider = &row.SeqChunkProvider{
JobID: idp.spec.Progress.JobID,
Registry: idp.flowCtx.Cfg.JobRegistry,
DB: idp.flowCtx.Cfg.DB,
}
}

return cp, nil
return idp, nil
}

// Start is part of the RowSource interface.
func (idp *readImportDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", idp.spec.JobID)
ctx = idp.StartInternal(ctx, readImportDataProcessorName)
// We don't have to worry about this go routine leaking because next we loop over progCh
// which is closed only after the go routine returns.
go func() {

grpCtx, cancel := context.WithCancel(ctx)
idp.cancel = cancel
idp.wg = ctxgroup.WithContext(grpCtx)
idp.wg.GoCtx(func(ctx context.Context) error {
defer close(idp.progCh)
idp.summary, idp.importErr = runImport(ctx, idp.flowCtx, &idp.spec, idp.progCh,
idp.seqChunkProvider)
}()
return nil
})
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -221,6 +232,22 @@ func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pro
}, nil
}

func (idp *readImportDataProcessor) ConsumerClosed() {
idp.close()
}

func (idp *readImportDataProcessor) close() {
// ipd.Closed is set by idp.InternalClose().
if idp.Closed {
return
}

idp.cancel()
_ = idp.wg.Wait()

idp.InternalClose()
}

func injectTimeIntoEvalCtx(evalCtx *eval.Context, walltime int64) {
sec := walltime / int64(time.Second)
nsec := walltime % int64(time.Second)
Expand Down Expand Up @@ -444,7 +471,7 @@ func ingestKvs(
offset++
}

pushProgress := func() {
pushProgress := func(ctx context.Context) {
var prog execinfrapb.RemoteProducerMetadata_BulkProcessorProgress
prog.ResumePos = make(map[int32]int64)
prog.CompletedFraction = make(map[int32]float32)
Expand All @@ -465,14 +492,18 @@ func ingestKvs(
bulkSummaryMu.summary.Reset()
bulkSummaryMu.Unlock()
}
progCh <- prog
select {
case progCh <- prog:
case <-ctx.Done():
}

}

// stopProgress will be closed when there is no more progress to report.
stopProgress := make(chan struct{})
g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(time.Second * 10)
tick := time.NewTicker(progressUpdateInterval)
defer tick.Stop()
done := ctx.Done()
for {
Expand All @@ -482,7 +513,7 @@ func ingestKvs(
case <-stopProgress:
return nil
case <-tick.C:
pushProgress()
pushProgress(ctx)
}
}
})
Expand Down Expand Up @@ -544,7 +575,7 @@ func ingestKvs(
if flowCtx.Cfg.TestingKnobs.BulkAdderFlushesEveryBatch {
_ = pkIndexAdder.Flush(ctx)
_ = indexAdder.Flush(ctx)
pushProgress()
pushProgress(ctx)
}
}
return nil
Expand Down
98 changes: 90 additions & 8 deletions pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/logtags"
)

var replanThreshold = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -217,8 +220,9 @@ func distImport(
}
}

flowCtx, watcher := newCancelWatcher(ctx, 3*progressUpdateInterval)
recv := sql.MakeDistSQLReceiver(
ctx,
flowCtx,
sql.NewMetadataCallbackWriter(rowResultWriter, metaFn),
tree.Rows,
nil, /* rangeCache */
Expand All @@ -238,6 +242,10 @@ func distImport(
stopProgress := make(chan struct{})

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
watcher.watch(ctx, recv)
return nil
})
g.GoCtx(func(ctx context.Context) error {
tick := time.NewTicker(time.Second * 10)
defer tick.Stop()
Expand All @@ -257,19 +265,20 @@ func distImport(
}
})

if testingKnobs.beforeRunDSP != nil {
if err := testingKnobs.beforeRunDSP(); err != nil {
return roachpb.BulkOpSummary{}, err
}
}

g.GoCtx(func(ctx context.Context) error {
defer cancelReplanner()
defer close(stopProgress)
defer watcher.stop()

if testingKnobs.beforeRunDSP != nil {
if err := testingKnobs.beforeRunDSP(); err != nil {
return err
}
}

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
dsp.Run(flowCtx, planCtx, nil, p, recv, &evalCtxCopy, testingKnobs.onSetupFinish)
return rowResultWriter.Err()
})

Expand All @@ -282,6 +291,79 @@ func distImport(
return res, nil
}

// cancelWatcher is used to handle job PAUSE and CANCEL
// gracefully.
//
// When the a job is canceled or paused, the context passed to the
// Resumer is canceled by the job system. Rather than passing the
// Resumer's context directly to our DistSQL receiver and planner, we
// instead construct a new context for the distsql machinery and watch
// the original context for cancelation using this cancelWatcher.
//
// When the cancelWatcher sees a cancelation on the watched context,
// it informs the DistSQL receiver passed to watch by calling
// SetError. This gives the DistSQL flow a chance to drain gracefully.
//
// However, we do not want to wait forever. After SetError is called,
// we wait up to `timeout` before canceling the context returned by
// newCancelWatcher.
type cancelWatcher struct {
watchedCtx context.Context
timeout time.Duration

done chan struct{}
cancel context.CancelFunc
}

// newCancelWatcher constructs a cancelWatcher. To start the watcher
// call watch. The context passed to newCancelWatcher will be watched
// for cancelation. The returned context should be used for the
// DistSQL receiver and DistSQLPlanner.
func newCancelWatcher(
ctxToWatch context.Context, timeout time.Duration,
) (context.Context, *cancelWatcher) {
ctx, cancel := context.WithCancel(
logtags.AddTags(
context.Background(),
logtags.FromContext(ctxToWatch)))
return ctx, &cancelWatcher{
watchedCtx: ctxToWatch,
timeout: timeout,

done: make(chan struct{}),
cancel: cancel,
}
}

// watch starts watching the context passed to newCancelWatcher for
// cancelation and notifies the given DistSQLReceiver when a
// cancellation occurs.
//
// After cancellation, if the watcher is not stopped before the
// configured timeout, the context returned from the constructor is
// cancelled.
func (c *cancelWatcher) watch(ctx context.Context, recv *sql.DistSQLReceiver) {
select {
case <-c.watchedCtx.Done():
recv.SetError(c.watchedCtx.Err())
timer := timeutil.NewTimer()
defer timer.Stop()
timer.Reset(c.timeout)
select {
case <-c.done:
case <-timer.C:
timer.Read = true
log.Warningf(ctx, "watcher not stopped after %s, canceling flow context", c.timeout)
c.cancel()
}
case <-c.done:
}
}

func (c *cancelWatcher) stop() {
close(c.done)
}

func getLastImportSummary(job *jobs.Job) roachpb.BulkOpSummary {
progress := job.Progress()
importProgress := progress.GetImport()
Expand Down
59 changes: 59 additions & 0 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2079,6 +2079,65 @@ func TestFailedImportGC(t *testing.T) {
tests.CheckKeyCount(t, kvDB, td.TableSpan(keys.SystemSQLCodec), 0)
}

// TestImportIntoCSVCancel cancels a distributed import. This test
// currently has few assertions but is essentially a regression tests
// since the cancellation process would previously leak go routines.
func TestImportIntoCSVCancel(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t)
skip.UnderRace(t, "takes >1min under race")

const nodes = 3

numFiles := nodes + 2
rowsPerFile := 5000
rowsPerRaceFile := 16

defer TestingSetParallelImporterReaderBatchSize(1)()

ctx := context.Background()
baseDir := testutils.TestDataPath(t, "csv")
tc := serverutils.StartNewTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
BulkAdderFlushesEveryBatch: true,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
DisableDefaultTestTenant: true,
ExternalIODir: baseDir,
}})
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0)

setupDoneCh := make(chan struct{})
for i := 0; i < tc.NumServers(); i++ {
tc.Server(i).JobRegistry().(*jobs.Registry).TestingResumerCreationKnobs = map[jobspb.Type]func(raw jobs.Resumer) jobs.Resumer{
jobspb.TypeImport: func(raw jobs.Resumer) jobs.Resumer {
r := raw.(*importResumer)
r.testingKnobs.onSetupFinish = func() {
close(setupDoneCh)
}
return r
},
}
}

sqlDB := sqlutils.MakeSQLRunner(conn)
testFiles := makeCSVData(t, numFiles, rowsPerFile, nodes, rowsPerRaceFile)

sqlDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b STRING)`)

var jobID int
row := sqlDB.QueryRow(t, fmt.Sprintf("IMPORT INTO t (a, b) CSV DATA (%s) WITH DETACHED", strings.Join(testFiles.files, ",")))
row.Scan(&jobID)
<-setupDoneCh
sqlDB.Exec(t, fmt.Sprintf("CANCEL JOB %d", jobID))
sqlDB.Exec(t, fmt.Sprintf("SHOW JOB WHEN COMPLETE %d", jobID))
}

// Verify that a failed import will clean up after itself. This means:
// - Delete the garbage data that it partially imported.
// - Delete the table descriptor for the table that was created during the
Expand Down

0 comments on commit 87cbbe1

Please sign in to comment.