From e613fdcc102bf0ae427a599a4cbc1128a852e0d4 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sun, 10 May 2020 17:00:49 -0700 Subject: [PATCH 01/10] colexec: fix performance inefficiency in materializer We mistakenly were passing `sqlbase.DatumAlloc` by value, and not by pointer, and as a result we would always be allocating 16 datums but using only 1 - i.e. we were not only not pooling the allocations, but actually making a bunch of useless allocations as well. This inefficiency becomes noticeable when the vectorized query returns many rows and when we have wrapped processors and those processors get a lot of input rows - in all cases when we need to materialize a lot. For example, TPC-H query 16 sees about 10% improvement (it returns 18k rows) and TPC-DS query 6 sees 2x improvement (it has wrapped hash aggregator with a decimal column) with this fix. Release note (performance improvement): A performance inefficiency has been fixed in the vectorized execution engine which results in speed ups on all queries when run via the vectorized engine, with most noticeable gains on the queries that output many rows. --- pkg/sql/colexec/builtin_funcs.go | 2 +- pkg/sql/colexec/cfetcher.go | 2 +- pkg/sql/colexec/materializer.go | 2 +- pkg/sql/colexec/projection_ops_test.go | 4 ++-- pkg/sql/colexec/vec_elem_to_datum.go | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/sql/colexec/builtin_funcs.go b/pkg/sql/colexec/builtin_funcs.go index abd471853445..3f6708efc578 100644 --- a/pkg/sql/colexec/builtin_funcs.go +++ b/pkg/sql/colexec/builtin_funcs.go @@ -72,7 +72,7 @@ func (b *defaultBuiltinFuncOperator) Next(ctx context.Context) coldata.Batch { for j := range b.argumentCols { col := batch.ColVec(b.argumentCols[j]) - b.row[j] = PhysicalTypeColElemToDatum(col, rowIdx, b.da, b.columnTypes[b.argumentCols[j]]) + b.row[j] = PhysicalTypeColElemToDatum(col, rowIdx, &b.da, b.columnTypes[b.argumentCols[j]]) hasNulls = hasNulls || b.row[j] == tree.DNull } diff --git a/pkg/sql/colexec/cfetcher.go b/pkg/sql/colexec/cfetcher.go index bbe8ec3030e2..299b51cd22a3 100644 --- a/pkg/sql/colexec/cfetcher.go +++ b/pkg/sql/colexec/cfetcher.go @@ -897,7 +897,7 @@ func (rf *cFetcher) pushState(state fetcherState) { // getDatumAt returns the converted datum object at the given (colIdx, rowIdx). // This function is meant for tracing and should not be used in hot paths. func (rf *cFetcher) getDatumAt(colIdx int, rowIdx int, typ *types.T) tree.Datum { - return PhysicalTypeColElemToDatum(rf.machine.colvecs[colIdx], rowIdx, rf.table.da, typ) + return PhysicalTypeColElemToDatum(rf.machine.colvecs[colIdx], rowIdx, &rf.table.da, typ) } // processValue processes the state machine's current value component, setting diff --git a/pkg/sql/colexec/materializer.go b/pkg/sql/colexec/materializer.go index 9900d77c1844..9d2481b8f338 100644 --- a/pkg/sql/colexec/materializer.go +++ b/pkg/sql/colexec/materializer.go @@ -173,7 +173,7 @@ func (m *Materializer) next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadat typs := m.OutputTypes() for colIdx := 0; colIdx < len(typs); colIdx++ { col := m.batch.ColVec(colIdx) - m.row[colIdx].Datum = PhysicalTypeColElemToDatum(col, rowIdx, m.da, typs[colIdx]) + m.row[colIdx].Datum = PhysicalTypeColElemToDatum(col, rowIdx, &m.da, typs[colIdx]) } return m.ProcessRowHelper(m.row), nil } diff --git a/pkg/sql/colexec/projection_ops_test.go b/pkg/sql/colexec/projection_ops_test.go index 2024c046a8fb..a8c8bacb3cac 100644 --- a/pkg/sql/colexec/projection_ops_test.go +++ b/pkg/sql/colexec/projection_ops_test.go @@ -266,8 +266,8 @@ func TestRandomComparisons(t *testing.T) { ) } for i := range lDatums { - lDatums[i] = PhysicalTypeColElemToDatum(lVec, i, da, typ) - rDatums[i] = PhysicalTypeColElemToDatum(rVec, i, da, typ) + lDatums[i] = PhysicalTypeColElemToDatum(lVec, i, &da, typ) + rDatums[i] = PhysicalTypeColElemToDatum(rVec, i, &da, typ) } for _, cmpOp := range []tree.ComparisonOperator{tree.EQ, tree.NE, tree.LT, tree.LE, tree.GT, tree.GE} { for i := range lDatums { diff --git a/pkg/sql/colexec/vec_elem_to_datum.go b/pkg/sql/colexec/vec_elem_to_datum.go index 48fc2096254f..79f6c6695860 100644 --- a/pkg/sql/colexec/vec_elem_to_datum.go +++ b/pkg/sql/colexec/vec_elem_to_datum.go @@ -29,7 +29,7 @@ import ( // that this function handles nulls as well, so there is no need for a separate // null check. func PhysicalTypeColElemToDatum( - col coldata.Vec, rowIdx int, da sqlbase.DatumAlloc, ct *types.T, + col coldata.Vec, rowIdx int, da *sqlbase.DatumAlloc, ct *types.T, ) tree.Datum { if col.MaybeHasNulls() { if col.Nulls().NullAt(rowIdx) { From 8d67e149b4bb53a100a4d9a43955cf1349341395 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sun, 10 May 2020 22:24:56 -0700 Subject: [PATCH 02/10] sqlbase: prohibit copying DatumAlloc by value This commit adds `_ util.NoCopy` to `DatumAlloc` struct to prevent us from misusing it. A few places failed the linter, and those have been addressed, but there was no performance problems AFAICT due to the removed copies by value. Release note: None --- pkg/sql/colexec/cfetcher.go | 18 +++++------------- pkg/sql/conn_executor.go | 2 +- pkg/sql/copy.go | 2 +- pkg/sql/copy_file_upload.go | 2 +- pkg/sql/create_table.go | 2 +- pkg/sql/delete_range.go | 2 +- pkg/sql/join_test.go | 2 +- pkg/sql/opt_exec_factory.go | 16 ++++++++-------- pkg/sql/planner.go | 4 ++-- pkg/sql/planner_test.go | 3 ++- pkg/sql/rowcontainer/disk_row_container.go | 7 ++++--- .../rowcontainer/disk_row_container_test.go | 8 ++++---- pkg/sql/rowexec/distinct.go | 14 +++++++++++--- pkg/sql/rowexec/indexbackfiller.go | 2 +- pkg/sql/sqlbase/datum_alloc.go | 8 +++++++- 15 files changed, 50 insertions(+), 42 deletions(-) diff --git a/pkg/sql/colexec/cfetcher.go b/pkg/sql/colexec/cfetcher.go index 299b51cd22a3..9ef12cc459f1 100644 --- a/pkg/sql/colexec/cfetcher.go +++ b/pkg/sql/colexec/cfetcher.go @@ -162,7 +162,7 @@ func (m colIdxMap) get(c sqlbase.ColumnID) (int, bool) { // } type cFetcher struct { // table is the table that's configured for fetching. - table cTableInfo + table *cTableInfo // reverse denotes whether or not the spans should be read in reverse // or not when StartScan is invoked. @@ -266,7 +266,6 @@ func (rf *cFetcher) Init( } tableArgs := tables[0] - oldTable := rf.table m := colIdxMap{ vals: make(sqlbase.ColumnIDs, 0, len(tableArgs.ColIdxMap)), @@ -278,20 +277,13 @@ func (rf *cFetcher) Init( } sort.Sort(m) colDescriptors := tableArgs.Cols - table := cTableInfo{ + table := &cTableInfo{ spans: tableArgs.Spans, desc: tableArgs.Desc, colIdxMap: m, index: tableArgs.Index, isSecondaryIndex: tableArgs.IsSecondaryIndex, cols: colDescriptors, - - // These slice fields might get re-allocated below, so reslice them from - // the old table here in case they've got enough capacity already. - indexColOrdinals: oldTable.indexColOrdinals[:0], - extraValColOrdinals: oldTable.extraValColOrdinals[:0], - allIndexColOrdinals: oldTable.allIndexColOrdinals[:0], - allExtraValColOrdinals: oldTable.allExtraValColOrdinals[:0], } typs := make([]*types.T, len(colDescriptors)) @@ -403,7 +395,7 @@ func (rf *cFetcher) Init( if err != nil { return err } - if cHasExtraCols(&table) { + if cHasExtraCols(table) { // Unique secondary indexes have a value that is the // primary index key. // Primary indexes only contain ascendingly-encoded @@ -908,7 +900,7 @@ func (rf *cFetcher) getDatumAt(colIdx int, rowIdx int, typ *types.T) tree.Datum func (rf *cFetcher) processValue( ctx context.Context, familyID sqlbase.FamilyID, ) (prettyKey string, prettyValue string, err error) { - table := &rf.table + table := rf.table if rf.traceKV { var buf strings.Builder @@ -1211,7 +1203,7 @@ func (rf *cFetcher) processValueTuple( } func (rf *cFetcher) fillNulls() error { - table := &rf.table + table := rf.table if rf.machine.remainingValueColsByIdx.Empty() { return nil } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 8d38ff762e20..1ba8ae0d6073 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -592,7 +592,7 @@ func (s *Server) newConnExecutor( settings: s.cfg.Settings, }, memMetrics: memMetrics, - planner: planner{execCfg: s.cfg}, + planner: planner{execCfg: s.cfg, alloc: &sqlbase.DatumAlloc{}}, // ctxHolder will be reset at the start of run(). We only define // it here so that an early call to close() doesn't panic. diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index b3e28c3e2d70..65ee8354f9e5 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -104,7 +104,7 @@ func newCopyMachine( columns: n.Columns, txnOpt: txnOpt, // The planner will be prepared before use. - p: planner{execCfg: execCfg}, + p: planner{execCfg: execCfg, alloc: &sqlbase.DatumAlloc{}}, execInsertPlan: execInsertPlan, } diff --git a/pkg/sql/copy_file_upload.go b/pkg/sql/copy_file_upload.go index 372877743e5b..854702a09000 100644 --- a/pkg/sql/copy_file_upload.go +++ b/pkg/sql/copy_file_upload.go @@ -57,7 +57,7 @@ func newFileUploadMachine( c := ©Machine{ conn: conn, // The planner will be prepared before use. - p: planner{execCfg: execCfg}, + p: planner{execCfg: execCfg, alloc: &sqlbase.DatumAlloc{}}, } f = &fileUploadMachine{ c: c, diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index cc9b85487369..d82a01a33a28 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -394,7 +394,7 @@ func (n *createTableNode) startExec(params runParams) error { desc.Columns, row.SkipFKs, nil, /* fkTables */ - ¶ms.p.alloc) + params.p.alloc) if err != nil { return err } diff --git a/pkg/sql/delete_range.go b/pkg/sql/delete_range.go index 87dfc5aba03e..8990ca53ad98 100644 --- a/pkg/sql/delete_range.go +++ b/pkg/sql/delete_range.go @@ -192,7 +192,7 @@ func (d *deleteRangeNode) startExec(params runParams) error { sqlbase.ScanLockingStrength_FOR_NONE, false, /* returnRangeInfo */ false, /* isCheck */ - ¶ms.p.alloc, + params.p.alloc, allTables..., ); err != nil { return err diff --git a/pkg/sql/join_test.go b/pkg/sql/join_test.go index ca476225b6e5..2ba6fd2e4930 100644 --- a/pkg/sql/join_test.go +++ b/pkg/sql/join_test.go @@ -24,7 +24,7 @@ import ( func newTestScanNode(kvDB *kv.DB, tableName string) (*scanNode, error) { desc := sqlbase.GetImmutableTableDescriptor(kvDB, keys.SystemSQLCodec, sqlutils.TestDB, tableName) - p := planner{} + p := planner{alloc: &sqlbase.DatumAlloc{}} scan := p.Scan() scan.desc = desc err := scan.initDescDefaults(p.curPlan.deps, publicColumnsCfg) diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 00fe83e9c3db..b81878903af9 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1231,7 +1231,7 @@ func (ef *execFactory) ConstructInsert( } // Create the table inserter, which does the bulk of the work. ri, err := row.MakeInserter( - ctx, ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, colDescs, checkFKs, fkTables, &ef.planner.alloc, + ctx, ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, colDescs, checkFKs, fkTables, ef.planner.alloc, ) if err != nil { return nil, err @@ -1296,7 +1296,7 @@ func (ef *execFactory) ConstructInsertFastPath( // Create the table inserter, which does the bulk of the work. ri, err := row.MakeInserter( - ctx, ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, colDescs, row.SkipFKs, nil /* fkTables */, &ef.planner.alloc, + ctx, ef.planner.txn, ef.planner.ExecCfg().Codec, tabDesc, colDescs, row.SkipFKs, nil /* fkTables */, ef.planner.alloc, ) if err != nil { return nil, err @@ -1408,7 +1408,7 @@ func (ef *execFactory) ConstructUpdate( row.UpdaterDefault, checkFKs, ef.planner.EvalContext(), - &ef.planner.alloc, + ef.planner.alloc, ) if err != nil { return nil, err @@ -1554,7 +1554,7 @@ func (ef *execFactory) ConstructUpsert( insertColDescs, checkFKs, fkTables, - &ef.planner.alloc, + ef.planner.alloc, ) if err != nil { return nil, err @@ -1572,7 +1572,7 @@ func (ef *execFactory) ConstructUpsert( row.UpdaterDefault, checkFKs, ef.planner.EvalContext(), - &ef.planner.alloc, + ef.planner.alloc, ) if err != nil { return nil, err @@ -1599,7 +1599,7 @@ func (ef *execFactory) ConstructUpsert( insertCols: ri.InsertCols, tw: optTableUpserter{ ri: ri, - alloc: &ef.planner.alloc, + alloc: ef.planner.alloc, canaryOrdinal: int(canaryCol), fkTables: fkTables, fetchCols: fetchColDescs, @@ -1688,7 +1688,7 @@ func (ef *execFactory) ConstructDelete( fetchColDescs, checkFKs, ef.planner.EvalContext(), - &ef.planner.alloc, + ef.planner.alloc, ) if err != nil { return nil, err @@ -1703,7 +1703,7 @@ func (ef *execFactory) ConstructDelete( *del = deleteNode{ source: input.(planNode), run: deleteRun{ - td: tableDeleter{rd: rd, alloc: &ef.planner.alloc}, + td: tableDeleter{rd: rd, alloc: ef.planner.alloc}, }, } diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index c6f408cf3584..664787d1236f 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -182,7 +182,7 @@ type planner struct { // Use a common datum allocator across all the plan nodes. This separates the // plan lifetime from the lifetime of returned results allowing plan nodes to // be pool allocated. - alloc sqlbase.DatumAlloc + alloc *sqlbase.DatumAlloc // optPlanningCtx stores the optimizer planning context, which contains // data structures that can be reused between queries (for efficiency). @@ -269,7 +269,7 @@ func newInternalPlanner( ts = readTimestamp.GoTime() } - p := &planner{execCfg: execCfg} + p := &planner{execCfg: execCfg, alloc: &sqlbase.DatumAlloc{}} p.txn = txn p.stmt = nil diff --git a/pkg/sql/planner_test.go b/pkg/sql/planner_test.go index 3d45bd6b4c04..b9e16a32a896 100644 --- a/pkg/sql/planner_test.go +++ b/pkg/sql/planner_test.go @@ -15,12 +15,13 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) func TestTypeAsString(t *testing.T) { defer leaktest.AfterTest(t)() - p := planner{} + p := planner{alloc: &sqlbase.DatumAlloc{}} testData := []struct { expr tree.Expr expected string diff --git a/pkg/sql/rowcontainer/disk_row_container.go b/pkg/sql/rowcontainer/disk_row_container.go index 622114bb05b6..f4304dd01a5d 100644 --- a/pkg/sql/rowcontainer/disk_row_container.go +++ b/pkg/sql/rowcontainer/disk_row_container.go @@ -65,7 +65,7 @@ type DiskRowContainer struct { diskMonitor *mon.BytesMonitor engine diskmap.Factory - datumAlloc sqlbase.DatumAlloc + datumAlloc *sqlbase.DatumAlloc } var _ SortableRowContainer = &DiskRowContainer{} @@ -92,6 +92,7 @@ func MakeDiskRowContainer( scratchEncRow: make(sqlbase.EncDatumRow, len(types)), diskMonitor: diskMonitor, engine: e, + datumAlloc: &sqlbase.DatumAlloc{}, } d.bufferedRows = d.diskMap.NewBatchWriter() @@ -144,14 +145,14 @@ func (d *DiskRowContainer) AddRow(ctx context.Context, row sqlbase.EncDatumRow) for i, orderInfo := range d.ordering { col := orderInfo.ColIdx var err error - d.scratchKey, err = row[col].Encode(d.types[col], &d.datumAlloc, d.encodings[i], d.scratchKey) + d.scratchKey, err = row[col].Encode(d.types[col], d.datumAlloc, d.encodings[i], d.scratchKey) if err != nil { return err } } for _, i := range d.valueIdxs { var err error - d.scratchVal, err = row[i].Encode(d.types[i], &d.datumAlloc, sqlbase.DatumEncoding_VALUE, d.scratchVal) + d.scratchVal, err = row[i].Encode(d.types[i], d.datumAlloc, sqlbase.DatumEncoding_VALUE, d.scratchVal) if err != nil { return err } diff --git a/pkg/sql/rowcontainer/disk_row_container_test.go b/pkg/sql/rowcontainer/disk_row_container_test.go index e4e6d1ff7bdc..ab6606a51c42 100644 --- a/pkg/sql/rowcontainer/disk_row_container_test.go +++ b/pkg/sql/rowcontainer/disk_row_container_test.go @@ -150,14 +150,14 @@ func TestDiskRowContainer(t *testing.T) { // Ensure the datum fields are set and no errors occur when // decoding. for i, encDatum := range readRow { - if err := encDatum.EnsureDecoded(typs[i], &d.datumAlloc); err != nil { + if err := encDatum.EnsureDecoded(typs[i], d.datumAlloc); err != nil { t.Fatal(err) } } // Check equality of the row we wrote and the row we read. for i := range row { - if cmp, err := readRow[i].Compare(typs[i], &d.datumAlloc, &evalCtx, &row[i]); err != nil { + if cmp, err := readRow[i].Compare(typs[i], d.datumAlloc, &evalCtx, &row[i]); err != nil { t.Fatal(err) } else if cmp != 0 { t.Fatalf("encoded %s but decoded %s", row.String(typs), readRow.String(typs)) @@ -213,14 +213,14 @@ func TestDiskRowContainer(t *testing.T) { // Ensure datum fields are set and no errors occur when // decoding. for i, encDatum := range row { - if err := encDatum.EnsureDecoded(types[i], &d.datumAlloc); err != nil { + if err := encDatum.EnsureDecoded(types[i], d.datumAlloc); err != nil { t.Fatal(err) } } // Check sorted order. if cmp, err := compareRows( - types, sortedRows.EncRow(numKeysRead), row, &evalCtx, &d.datumAlloc, ordering, + types, sortedRows.EncRow(numKeysRead), row, &evalCtx, d.datumAlloc, ordering, ); err != nil { t.Fatal(err) } else if cmp != 0 { diff --git a/pkg/sql/rowexec/distinct.go b/pkg/sql/rowexec/distinct.go index eef19965fe5b..3b29b37c1656 100644 --- a/pkg/sql/rowexec/distinct.go +++ b/pkg/sql/rowexec/distinct.go @@ -112,12 +112,20 @@ func newDistinct( var returnProcessor execinfra.RowSourcedProcessor = d if allSorted { // We can use the faster sortedDistinct processor. + // TODO(asubiotto): We should have a distinctBase, rather than making a copy + // of a distinct processor. sd := &sortedDistinct{ - distinct: *d, + distinct: distinct{ + input: input, + orderedCols: spec.OrderedColumns, + distinctCols: distinctCols, + memAcc: memMonitor.MakeBoundAccount(), + types: input.OutputTypes(), + nullsAreDistinct: spec.NullsAreDistinct, + errorOnDup: spec.ErrorOnDup, + }, } // Set d to the new distinct copy for further initialization. - // TODO(asubiotto): We should have a distinctBase, rather than making a copy - // of a distinct processor. d = &sd.distinct returnProcessor = sd } diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index 2c4635779701..8f603285c74a 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -106,7 +106,7 @@ func (ib *indexBackfiller) prepare(ctx context.Context) error { return nil } -func (ib indexBackfiller) close(ctx context.Context) { +func (ib *indexBackfiller) close(ctx context.Context) { ib.adder.Close(ctx) } diff --git a/pkg/sql/sqlbase/datum_alloc.go b/pkg/sql/sqlbase/datum_alloc.go index 6417e6321f5b..aea5381172a5 100644 --- a/pkg/sql/sqlbase/datum_alloc.go +++ b/pkg/sql/sqlbase/datum_alloc.go @@ -10,11 +10,17 @@ package sqlbase -import "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" +import ( + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util" +) // DatumAlloc provides batch allocation of datum pointers, amortizing the cost // of the allocations. +// NOTE: it *must* be passed in by a pointer. type DatumAlloc struct { + _ util.NoCopy + datumAlloc []tree.Datum dintAlloc []tree.DInt dfloatAlloc []tree.DFloat From 4c860fd211a9a74791141b53703a0af5870e6bb7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Sun, 10 May 2020 16:31:13 -0700 Subject: [PATCH 03/10] colexec: short-circuit hash join when build side is empty Previously, we would always fully consume both sides of the join, even when the build (right) side is empty. We can, however, short-circuit in such case for INNER, RIGHT OUTER, and LEFT SEMI joins and skip probing phase altogether in such scenarios. For example, this helps query 93 of TPC-DS benchmark with scale factor 1. Release note: None --- pkg/sql/colexec/hashjoiner.go | 40 ++++++++++++++++++++++++------ pkg/sql/colexec/hashjoiner_test.go | 6 +---- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/pkg/sql/colexec/hashjoiner.go b/pkg/sql/colexec/hashjoiner.go index d4d15b397ced..2457f1855d5d 100644 --- a/pkg/sql/colexec/hashjoiner.go +++ b/pkg/sql/colexec/hashjoiner.go @@ -26,19 +26,25 @@ import ( type hashJoinerState int const ( - // hjBuilding represents the state the hashJoiner when it is in the build - // phase. Output columns from the build table are stored and a hash map is - // constructed from its equality columns. + // hjBuilding represents the state the hashJoiner is in when it is in the + // build phase. Output columns from the build table are stored and a hash + // map is constructed from its equality columns. hjBuilding = iota - // hjProbing represents the state the hashJoiner is in when it is in the probe - // phase. Probing is done in batches against the stored hash map. + // hjProbing represents the state the hashJoiner is in when it is in the + // probe phase. Probing is done in batches against the stored hash map. hjProbing // hjEmittingUnmatched represents the state the hashJoiner is in when it is // emitting unmatched rows from its build table after having consumed the // probe table. This happens in the case of an outer join on the build side. hjEmittingUnmatched + + // hjDone represents the state the hashJoiner is in when it has finished + // emitting all output rows. Note that the build side will have been fully + // consumed in this state, but the probe side *might* have not been fully + // consumed. + hjDone ) // hashJoinerSpec is the specification for a hash join operator. The hash @@ -236,18 +242,38 @@ func (hj *hashJoiner) Next(ctx context.Context) coldata.Batch { switch hj.state { case hjBuilding: hj.build(ctx) + if hj.ht.vals.Length() == 0 { + // The build side is empty, so we can short-circuit probing + // phase altogether for INNER, RIGHT OUTER, and LEFT SEMI + // joins. + if hj.spec.joinType == sqlbase.JoinType_INNER || + hj.spec.joinType == sqlbase.JoinType_RIGHT_OUTER || + hj.spec.joinType == sqlbase.JoinType_LEFT_SEMI { + hj.state = hjDone + } + } continue case hjProbing: hj.exec(ctx) - if hj.output.Length() == 0 && hj.spec.right.outer { - hj.state = hjEmittingUnmatched + if hj.output.Length() == 0 { + if hj.spec.right.outer { + hj.state = hjEmittingUnmatched + } else { + hj.state = hjDone + } continue } return hj.output case hjEmittingUnmatched: + if hj.emittingUnmatchedState.rowIdx == hj.ht.vals.Length() { + hj.state = hjDone + continue + } hj.emitUnmatched() return hj.output + case hjDone: + return coldata.ZeroBatch default: colexecerror.InternalError("hash joiner in unhandled state") // This code is unreachable, but the compiler cannot infer that. diff --git a/pkg/sql/colexec/hashjoiner_test.go b/pkg/sql/colexec/hashjoiner_test.go index 0bdf6867d6d6..e769ac05763f 100644 --- a/pkg/sql/colexec/hashjoiner_test.go +++ b/pkg/sql/colexec/hashjoiner_test.go @@ -1168,8 +1168,7 @@ func TestHashJoinerProjection(t *testing.T) { hjOp, err := NewColOperator(ctx, flowCtx, args) require.NoError(t, err) hjOp.Op.Init() - for { - b := hjOp.Op.Next(ctx) + for b := hjOp.Op.Next(ctx); b.Length() > 0; b = hjOp.Op.Next(ctx) { // The output types should be {Int64, Int64, Bool, Decimal, Float64, Bytes} // and we check this explicitly. b.ColVec(0).Int64() @@ -1178,8 +1177,5 @@ func TestHashJoinerProjection(t *testing.T) { b.ColVec(3).Decimal() b.ColVec(4).Float64() b.ColVec(5).Bytes() - if b.Length() == 0 { - break - } } } From cd11a75f3ae7138b9acde013f3a206a276f7ae32 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 15 Mar 2020 17:00:38 -0400 Subject: [PATCH 04/10] kvserver: enable rangefeed unconditionally on system ranges Release note: None --- pkg/kv/kvserver/client_rangefeed_test.go | 104 ++++++++++++++++++++++ pkg/kv/kvserver/client_replica_test.go | 69 ++++++++++++++ pkg/kv/kvserver/replica.go | 13 ++- pkg/kv/kvserver/replica_rangefeed.go | 2 +- pkg/kv/kvserver/replica_rangefeed_test.go | 22 ++++- pkg/kv/kvserver/replica_test.go | 58 ------------ pkg/kv/kvserver/replica_write.go | 2 +- 7 files changed, 205 insertions(+), 65 deletions(-) create mode 100644 pkg/kv/kvserver/client_rangefeed_test.go diff --git a/pkg/kv/kvserver/client_rangefeed_test.go b/pkg/kv/kvserver/client_rangefeed_test.go new file mode 100644 index 000000000000..3d9fea7e0323 --- /dev/null +++ b/pkg/kv/kvserver/client_rangefeed_test.go @@ -0,0 +1,104 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestRangefeedWorksOnSystemRangesUnconditionally ensures that a rangefeed will +// not return an error when operating on a system span even if the setting is +// disabled. The test also ensures that an error is received if a rangefeed is +// run on a user table. +func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + // Make sure the rangefeed setting really is disabled. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = false") + require.NoError(t, err) + + db := tc.Server(0).DB() + ds := tc.Server(0).DistSenderI().(*kvcoord.DistSender) + + t.Run("works on system ranges", func(t *testing.T) { + startTS := db.Clock().Now() + descTableKey := keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID) + descTableSpan := roachpb.Span{ + Key: descTableKey, + EndKey: descTableKey.PrefixEnd(), + } + + evChan := make(chan *roachpb.RangeFeedEvent) + rangefeedErrChan := make(chan error, 1) + ctxToCancel, cancel := context.WithCancel(ctx) + go func() { + rangefeedErrChan <- ds.RangeFeed(ctxToCancel, descTableSpan, startTS, false /* withDiff */, evChan) + }() + + // Note: 42 is a system descriptor. + const junkDescriptorID = 42 + require.GreaterOrEqual(t, keys.MaxReservedDescID, junkDescriptorID) + junkDescriptorKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, junkDescriptorID) + junkDescriptor := sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{ + Name: "junk", + ID: junkDescriptorID, + }) + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetSystemConfigTrigger(); err != nil { + return err + } + return txn.Put(ctx, junkDescriptorKey, junkDescriptor) + })) + after := db.Clock().Now() + for { + ev := <-evChan + if ev.Checkpoint != nil && after.Less(ev.Checkpoint.ResolvedTS) { + t.Fatal("expected to see write which occurred before the checkpoint") + } + + if ev.Val != nil && ev.Val.Key.Equal(junkDescriptorKey) { + var gotProto sqlbase.Descriptor + require.NoError(t, ev.Val.Value.GetProto(&gotProto)) + require.EqualValues(t, junkDescriptor, &gotProto) + break + } + } + cancel() + // There are several cases that seems like they can happen due + // to closed connections. Instead we just expect an error. + // The main point is we get an error in a timely manner. + require.Error(t, <-rangefeedErrChan) + }) + t.Run("does not work on user ranges", func(t *testing.T) { + k := tc.ScratchRange(t) + require.NoError(t, tc.WaitForSplitAndInitialization(k)) + startTS := db.Clock().Now() + scratchSpan := roachpb.Span{Key: k, EndKey: k.PrefixEnd()} + evChan := make(chan *roachpb.RangeFeedEvent) + require.Regexp(t, `rangefeeds require the kv\.rangefeed.enabled setting`, + ds.RangeFeed(ctx, scratchSpan, startTS, false /* withDiff */, evChan)) + }) +} diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index e02419f94205..471122d0910b 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/caller" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -51,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/kr/pretty" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft/raftpb" @@ -3138,6 +3140,73 @@ func TestStrictGCEnforcement(t *testing.T) { }) } +// TestProposalOverhead ensures that the command overhead for put operations +// is as expected. It exists to prevent changes which might increase the +// byte overhead of replicating commands. +// +// Note that it intentionally avoids using a system range which incurs the +// overhead due to the logical op log. +func TestProposalOverhead(t *testing.T) { + defer leaktest.AfterTest(t)() + + var overhead uint32 + var key atomic.Value + key.Store(roachpb.Key{}) + filter := func(args storagebase.ProposalFilterArgs) *roachpb.Error { + if len(args.Req.Requests) != 1 { + return nil + } + req, ok := args.Req.GetArg(roachpb.Put) + if !ok { + return nil + } + put := req.(*roachpb.PutRequest) + if !bytes.Equal(put.Key, key.Load().(roachpb.Key)) { + return nil + } + // Sometime the logical portion of the timestamp can be non-zero which makes + // the overhead non-deterministic. + args.Cmd.ReplicatedEvalResult.Timestamp.Logical = 0 + atomic.StoreUint32(&overhead, uint32(args.Cmd.Size()-args.Cmd.WriteBatch.Size())) + // We don't want to print the WriteBatch because it's explicitly + // excluded from the size computation. Nil'ing it out does not + // affect the memory held by the caller because neither `args` nor + // `args.Cmd` are pointers. + args.Cmd.WriteBatch = nil + t.Logf(pretty.Sprint(args.Cmd)) + return nil + } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{TestingProposalFilter: filter}, + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + // NB: the expected overhead reflects the space overhead currently + // present in Raft commands. This test will fail if that overhead + // changes. Try to make this number go down and not up. It slightly + // undercounts because our proposal filter is called before + // maxLeaseIndex is filled in. The difference between the user and system + // overhead is that users ranges do not have rangefeeds on by default whereas + // system ranges do. + const ( + expectedUserOverhead uint32 = 42 + ) + t.Run("user-key overhead", func(t *testing.T) { + userKey := tc.ScratchRange(t) + k := roachpb.Key(encoding.EncodeStringAscending(userKey, "foo")) + key.Store(k) + require.NoError(t, db.Put(ctx, k, "v")) + require.Equal(t, expectedUserOverhead, atomic.LoadUint32(&overhead)) + }) + +} + // getRangeInfo retreives range info by performing a get against the provided // key and setting the ReturnRangeInfo flag to true. func getRangeInfo( diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 671c564b9af0..939f7b05dcd7 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -778,7 +778,7 @@ func (r *Replica) getImpliedGCThresholdRLocked( // The GC threshold is the oldest value we can return here. if isAdmin || !StrictGCEnforcement.Get(&r.store.ClusterSettings().SV) || - r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.UserTableDataMin)) { + r.isSystemRangeRLocked() { return threshold } @@ -806,6 +806,17 @@ func (r *Replica) getImpliedGCThresholdRLocked( return threshold } +// isSystemRange returns true if r's key range precedes keys.UserTableDataMin. +func (r *Replica) isSystemRange() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return r.isSystemRangeRLocked() +} + +func (r *Replica) isSystemRangeRLocked() bool { + return r.mu.state.Desc.StartKey.Less(roachpb.RKey(keys.UserTableDataMin)) +} + // maxReplicaIDOfAny returns the maximum ReplicaID of any replica, including // voters and learners. func maxReplicaIDOfAny(desc *roachpb.RangeDescriptor) roachpb.ReplicaID { diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index ab72c998bb80..5f32e23f5919 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -130,7 +130,7 @@ func (i iteratorWithCloser) Close() { func (r *Replica) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { - if !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { + if !r.isSystemRangeRLocked() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", base.DocsURL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) } diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 76ad35e4b121..5c159fc0707f 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" @@ -23,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -661,17 +663,28 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { assertRangefeedRetryErr(t, pErr, roachpb.RangeFeedRetryError_REASON_RAFT_SNAPSHOT) }) t.Run(roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING.String(), func(t *testing.T) { - mtc, rangeID := setup(t) + mtc, _ := setup(t) defer mtc.Stop() + // Split the range so that the RHS is not a system range and thus will + // respect the rangefeed_enabled cluster setting. + startKey := keys.UserTableDataMin + splitArgs := adminSplitArgs(startKey) + if _, pErr := kv.SendWrapped(ctx, mtc.distSenders[0], splitArgs); pErr != nil { + t.Fatalf("split saw unexpected error: %v", pErr) + } + rightRangeID := mtc.Store(0).LookupReplica(roachpb.RKey(startKey)).RangeID + // Establish a rangefeed. stream := newTestStream() streamErrC := make(chan *roachpb.Error, 1) - rangefeedSpan := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")} + + endKey := keys.TableDataMax + rangefeedSpan := roachpb.Span{Key: startKey, EndKey: endKey} go func() { req := roachpb.RangeFeedRequest{ Header: roachpb.Header{ - RangeID: rangeID, + RangeID: rightRangeID, }, Span: rangefeedSpan, } @@ -688,7 +701,8 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) { kvserver.RangefeedEnabled.Override(&mtc.storeConfig.Settings.SV, false) // Perform a write on the range. - pArgs := putArgs(roachpb.Key("c"), []byte("val2")) + writeKey := encoding.EncodeStringAscending(keys.SystemSQLCodec.TablePrefix(55), "c") + pArgs := putArgs(writeKey, []byte("val2")) if _, pErr := kv.SendWrapped(ctx, mtc.distSenders[0], pArgs); pErr != nil { t.Fatal(pErr) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 18f7dca004f8..2191baeda760 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -6725,64 +6725,6 @@ func TestBatchErrorWithIndex(t *testing.T) { } } -func TestProposalOverhead(t *testing.T) { - defer leaktest.AfterTest(t)() - - key := roachpb.Key("k") - var overhead uint32 - - manual := hlc.NewManualClock(123) - tc := testContext{manualClock: manual} - cfg := TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond)) - cfg.TestingKnobs.TestingProposalFilter = - func(args storagebase.ProposalFilterArgs) *roachpb.Error { - if len(args.Req.Requests) != 1 { - return nil - } - req, ok := args.Req.GetArg(roachpb.Put) - if !ok { - return nil - } - put := req.(*roachpb.PutRequest) - if !bytes.Equal(put.Key, key) { - return nil - } - atomic.StoreUint32(&overhead, uint32(args.Cmd.Size()-args.Cmd.WriteBatch.Size())) - // We don't want to print the WriteBatch because it's explicitly - // excluded from the size computation. Nil'ing it out does not - // affect the memory held by the caller because neither `args` nor - // `args.Cmd` are pointers. - args.Cmd.WriteBatch = nil - t.Logf(pretty.Sprint(args.Cmd)) - return nil - } - stopper := stop.NewStopper() - defer stopper.Stop(context.TODO()) - tc.StartWithStoreConfig(t, stopper, cfg) - - ba := roachpb.BatchRequest{} - // Use a realistic timestamp with no logical portion to guarantee that the - // timestamp in the proposal has a deterministic size. - ba.Timestamp = hlc.Timestamp{WallTime: 1588747910671904812} - ba.Add(&roachpb.PutRequest{ - RequestHeader: roachpb.RequestHeader{Key: key}, - Value: roachpb.MakeValueFromString("v"), - }) - if _, pErr := tc.Sender().Send(context.Background(), ba); pErr != nil { - t.Fatal(pErr) - } - - // NB: the expected overhead reflects the space overhead currently - // present in Raft commands. This test will fail if that overhead - // changes. Try to make this number go down and not up. It slightly - // undercounts because our proposal filter is called before - // maxLeaseIndex is filled in. - const expectedOverhead = 42 - if v := atomic.LoadUint32(&overhead); expectedOverhead != v { - t.Fatalf("expected overhead of %d, but found %d", expectedOverhead, v) - } -} - // TestReplicaLoadSystemConfigSpanIntent verifies that intents on the SystemConfigSpan // cause an error, but trigger asynchronous cleanup. func TestReplicaLoadSystemConfigSpanIntent(t *testing.T) { diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index ead9d9a04704..3ef7345d4083 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -572,7 +572,7 @@ func (r *Replica) evaluateWriteBatchWrapper( func (r *Replica) newBatchedEngine(spans *spanset.SpanSet) (storage.Batch, *storage.OpLoggerBatch) { batch := r.store.Engine().NewBatch() var opLogger *storage.OpLoggerBatch - if RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { + if r.isSystemRange() || RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { // TODO(nvanbenschoten): once we get rid of the RangefeedEnabled // cluster setting we'll need a way to turn this on when any // replica (not just the leaseholder) wants it and off when no From 273a56ccd4782fd65561a89613434dcbbc992b6d Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 14 Apr 2020 23:26:27 -0400 Subject: [PATCH 05/10] clusterversion: add version to enable rangefeeds by default for system config Before this commit, rangefeeds were enabled on ranges only if the cluster setting `kv.rangefeed.enabled` was set to true. That's not going to fly if we're going to have critical system functionality rely on rangefeeds. We still don't want users paying for the penalty of rangefeeds unless they opt in but we don't care about the ~5% overhead for system config operations. Release note: None --- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 12 ++++++++++++ pkg/clusterversion/versionkey_string.go | 5 +++-- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index c5aed4a03808..244aa92f2498 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -70,6 +70,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen in the /debug page trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set -versioncustom validation20.1-3set the active cluster version in the format '.' +versioncustom validation20.1-4set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index b0b4c3acfbfa..7decde2809f1 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -63,6 +63,7 @@ const ( VersionStart20_2 VersionGeospatialType VersionEnums + VersionRangefeedLeases // Add new versions here (step one of two). ) @@ -471,6 +472,7 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 1}, }, { + // VersionGeospatialType enables the use of Geospatial features. Key: VersionGeospatialType, Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 2}, @@ -480,6 +482,16 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Key: VersionEnums, Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 3}, }, + { + + // VersionRangefeedLeases is the enablement of leases uses rangefeeds. + // All nodes with this versions will have rangefeeds enabled on all system + // ranges. Once this version is finalized, gossip is not needed in the + // schema lease subsystem. Nodes which start with this version finalized + // will not pass gossip to the SQL layer. + Key: VersionRangefeedLeases, + Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 4}, + }, // Add new versions here (step two of two). diff --git a/pkg/clusterversion/versionkey_string.go b/pkg/clusterversion/versionkey_string.go index 461882872d51..a7956ca8b6a5 100644 --- a/pkg/clusterversion/versionkey_string.go +++ b/pkg/clusterversion/versionkey_string.go @@ -39,11 +39,12 @@ func _() { _ = x[VersionStart20_2-28] _ = x[VersionGeospatialType-29] _ = x[VersionEnums-30] + _ = x[VersionRangefeedLeases-31] } -const _VersionKey_name = "Version19_1VersionStart19_2VersionLearnerReplicasVersionTopLevelForeignKeysVersionAtomicChangeReplicasTriggerVersionAtomicChangeReplicasVersionTableDescModificationTimeFromMVCCVersionPartitionedBackupVersion19_2VersionStart20_1VersionContainsEstimatesCounterVersionChangeReplicasDemotionVersionSecondaryIndexColumnFamiliesVersionNamespaceTableWithSchemasVersionProtectedTimestampsVersionPrimaryKeyChangesVersionAuthLocalAndTrustRejectMethodsVersionPrimaryKeyColumnsOutOfFamilyZeroVersionRootPasswordVersionNoExplicitForeignKeyIndexIDsVersionHashShardedIndexesVersionCreateRolePrivilegeVersionStatementDiagnosticsSystemTablesVersionSchemaChangeJobVersionSavepointsVersionTimeTZTypeVersionTimePrecisionVersion20_1VersionStart20_2VersionGeospatialTypeVersionEnums" +const _VersionKey_name = "Version19_1VersionStart19_2VersionLearnerReplicasVersionTopLevelForeignKeysVersionAtomicChangeReplicasTriggerVersionAtomicChangeReplicasVersionTableDescModificationTimeFromMVCCVersionPartitionedBackupVersion19_2VersionStart20_1VersionContainsEstimatesCounterVersionChangeReplicasDemotionVersionSecondaryIndexColumnFamiliesVersionNamespaceTableWithSchemasVersionProtectedTimestampsVersionPrimaryKeyChangesVersionAuthLocalAndTrustRejectMethodsVersionPrimaryKeyColumnsOutOfFamilyZeroVersionRootPasswordVersionNoExplicitForeignKeyIndexIDsVersionHashShardedIndexesVersionCreateRolePrivilegeVersionStatementDiagnosticsSystemTablesVersionSchemaChangeJobVersionSavepointsVersionTimeTZTypeVersionTimePrecisionVersion20_1VersionStart20_2VersionGeospatialTypeVersionEnumsVersionRangefeedLeases" -var _VersionKey_index = [...]uint16{0, 11, 27, 49, 75, 109, 136, 176, 200, 211, 227, 258, 287, 322, 354, 380, 404, 441, 480, 499, 534, 559, 585, 624, 646, 663, 680, 700, 711, 727, 748, 760} +var _VersionKey_index = [...]uint16{0, 11, 27, 49, 75, 109, 136, 176, 200, 211, 227, 258, 287, 322, 354, 380, 404, 441, 480, 499, 534, 559, 585, 624, 646, 663, 680, 700, 711, 727, 748, 760, 782} func (i VersionKey) String() string { if i < 0 || i >= VersionKey(len(_VersionKey_index)-1) { From cdc345e3dd2b054a45aa732cf22c3af69cf92f8d Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 16 Mar 2020 09:07:08 -0400 Subject: [PATCH 06/10] sql: implement lease manager on top of rangefeeds The pickle we'll want to eventually deal with is a way of plumbing into the system the idea that we don't need this gossip dependency. We need to make sure that we start tracking events for a table before we successfully send off the request to read the initial value. TODO(ajwerner): Maybe just allow it to be nil but note that if it is nil then you're going to have a bad time if for whatever reason this is a mixed version cluster (i.e. return an error when the cluster version hasn't been finalized and). Release note: None --- pkg/server/server_sql.go | 6 +- pkg/sql/lease.go | 340 ++++++++++++++++++++++++++------- pkg/sql/lease_internal_test.go | 3 +- pkg/sql/lease_test.go | 122 +++++++++--- pkg/sql/rename_test.go | 37 +--- 5 files changed, 388 insertions(+), 120 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 1ca58dea4390..ae19a1a33e6a 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -535,9 +535,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { ) execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry - leaseMgr.RefreshLeases(cfg.stopper, cfg.db, cfg.gossip) - leaseMgr.PeriodicallyRefreshSomeLeases() - temporaryObjectCleaner := sql.NewTemporaryObjectCleaner( cfg.Settings, cfg.db, @@ -592,6 +589,9 @@ func (s *sqlServer) start( if migrationManagerTestingKnobs := knobs.SQLMigrationManager; migrationManagerTestingKnobs != nil { mmKnobs = *migrationManagerTestingKnobs.(*sqlmigrations.MigrationManagerTestingKnobs) } + + s.leaseMgr.RefreshLeases(stopper, s.execCfg.DB, s.execCfg.Gossip) + s.leaseMgr.PeriodicallyRefreshSomeLeases() migrationsExecutor := sql.MakeInternalExecutor( ctx, s.pgServer.SQLServer, s.internalMemMetrics, s.execCfg.Settings) migrationsExecutor.SetSessionData( diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index baf4053836af..810242300f8f 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -22,10 +22,11 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings" @@ -890,6 +891,9 @@ func (m *LeaseManager) insertTableVersions(tableID sqlbase.ID, versions []*table // inserts it into the active set. It guarantees that the lease returned is // the one acquired after the call is made. Use this if the lease we want to // get needs to see some descriptor updates that we know happened recently. +// +// TODO(ajwerner): This really just needs to acquire a lease on the ID newer +// than something. Not necessarily newer than func (m *LeaseManager) AcquireFreshestFromStore(ctx context.Context, tableID sqlbase.ID) error { // Create tableState if needed. _ = m.findTableState(tableID, true /* create */) @@ -1259,16 +1263,23 @@ var _ base.ModuleTestingKnobs = &LeaseStoreTestingKnobs{} // LeaseManagerTestingKnobs contains test knobs. type LeaseManagerTestingKnobs struct { - // A callback called when a gossip update is received, before the leases are - // refreshed. Careful when using this to block for too long - you can block - // all the gossip users in the system. If it returns an error the gossip - // update is ignored. - GossipUpdateEvent func(*config.SystemConfig) error + // A callback called after the leases are refreshed as a result of a gossip update. - TestingLeasesRefreshedEvent func(*config.SystemConfig) + TestingTableRefreshedEvent func(descriptor *sqlbase.TableDescriptor) + + // TestingTableUpdateEvent is a callback when an update is received, before + // the leases are refreshed. If a non-nil error is returned, the update is + // ignored. + TestingTableUpdateEvent func(descriptor *sqlbase.TableDescriptor) error + // To disable the deletion of orphaned leases at server startup. DisableDeleteOrphanedLeases bool - LeaseStoreTestingKnobs LeaseStoreTestingKnobs + + // AlwaysUseRangefeeds ensures that rangefeeds and not gossip are used to + // detect changes to table descriptors. + AlwaysUseRangefeeds bool + + LeaseStoreTestingKnobs LeaseStoreTestingKnobs } var _ base.ModuleTestingKnobs = &LeaseManagerTestingKnobs{} @@ -1414,6 +1425,8 @@ type LeaseManager struct { ambientCtx log.AmbientContext stopper *stop.Stopper sem *quotapool.IntPool + + initialTimestamp hlc.Timestamp } const leaseConcurrencyLimit = 5 @@ -1454,9 +1467,10 @@ func NewLeaseManager( tableNames: tableNameCache{ tables: make(map[tableNameCacheKey]*tableVersionState), }, - ambientCtx: ambientCtx, - stopper: stopper, - sem: quotapool.NewIntPool("lease manager", leaseConcurrencyLimit), + ambientCtx: ambientCtx, + stopper: stopper, + sem: quotapool.NewIntPool("lease manager", leaseConcurrencyLimit), + initialTimestamp: clock.Now(), } lm.stopper.AddCloser(lm.sem.Closer("stopper")) lm.mu.tables = make(map[sqlbase.ID]*tableState) @@ -1771,73 +1785,271 @@ func (m *LeaseManager) findTableState(tableID sqlbase.ID, create bool) *tableSta } // RefreshLeases starts a goroutine that refreshes the lease manager -// leases for tables received in the latest system configuration via gossip. -func (m *LeaseManager) RefreshLeases(s *stop.Stopper, db *kv.DB, gw gossip.DeprecatedGossip) { +// leases for tables received in the latest system configuration via gossip or +// rangefeeds. This function must be passed a non-nil gossip if Version +func (m *LeaseManager) RefreshLeases(s *stop.Stopper, db *kv.DB, g gossip.DeprecatedGossip) { + + // Okay so we want to create a rangefeed and use that. + m.initialTimestamp = db.Clock().Now() + // Do we need to use the gossip at all? Seems like nah. Oh we need to because + // we don't know if rangefeeds are enabled. Grr. I guess the deal is that when + // it gets enabled then we flip the switch? We don't want to force the nodes + // to restart to get the goodness. We also want to be able to stop gossipping. + + // We're going to kick off a function to do the updating based on the gossip + // and then we'll also do one based on the rangefeeds and they'll synchronize. + // + // TODO(ajwerner): Fix this up, start out by assuming we're going to rely on + // gossip, depending on the minimum accepted version, then switch based on + // the finalization from the cluster setting or if the gossip is nil. + // + // We want an upgrade path such that after you finalize the 20.2 upgrade, the + // descriptor table is no longer gossipped. + + s.RunWorker(context.TODO(), func(ctx context.Context) { + for r := retry.Start(retry.Options{}); r.Next(); { + if m.settings.Version.ActiveVersionOrEmpty(ctx) != (clusterversion.ClusterVersion{}) { + break + } + } + m.refreshLeases(s, g, db) + }) +} + +func (m *LeaseManager) refreshLeases(s *stop.Stopper, g gossip.DeprecatedGossip, db *kv.DB) { ctx := context.TODO() + tableUpdateCh := make(chan *sqlbase.TableDescriptor) + ver := m.settings.Version.ActiveVersion(ctx) + fmt.Println(ver) + if m.testingKnobs.AlwaysUseRangefeeds || m.settings.Version.IsActive(ctx, clusterversion.VersionRangefeedLeases) { + m.watchForRangefeedUpdates(ctx, s, db, tableUpdateCh) + } else { + m.watchForGossipUpdates(ctx, s, g, tableUpdateCh) + } + s.RunWorker(ctx, func(ctx context.Context) { - descKeyPrefix := m.codec.TablePrefix(uint32(sqlbase.DescriptorTable.ID)) - cfgFilter := gossip.MakeSystemConfigDeltaFilter(descKeyPrefix) - gossipUpdateC := gw.DeprecatedRegisterSystemConfigChannel(47150) for { select { - case <-gossipUpdateC: - cfg := gw.DeprecatedSystemConfig(47150) - if m.testingKnobs.GossipUpdateEvent != nil { - if err := m.testingKnobs.GossipUpdateEvent(cfg); err != nil { - break + case table := <-tableUpdateCh: + // NB: We allow nil tables to be sent to synchronize the updating of + // tables. + if table == nil { + continue + } + + if evFunc := m.testingKnobs.TestingTableUpdateEvent; evFunc != nil { + if err := evFunc(table); err != nil { + log.Infof(ctx, "skipping table update of %v due to knob: %v", + table, err) } } - // Read all tables and their versions - if log.V(2) { - log.Info(ctx, "received a new config; will refresh leases") + + // Try to refresh the table lease to one >= this version. + if err := purgeOldVersions( + ctx, db, table.ID, table.GoingOffline(), table.Version, m); err != nil { + log.Warningf(ctx, "error purging leases for table %d(%s): %s", + table.ID, table.Name, err) + } + if evFunc := m.testingKnobs.TestingTableRefreshedEvent; evFunc != nil { + evFunc(table) } - cfgFilter.ForModified(cfg, func(kv roachpb.KeyValue) { - // Attempt to unmarshal config into a table/database descriptor. - var descriptor sqlbase.Descriptor - if err := kv.Value.GetProto(&descriptor); err != nil { - log.Warningf(ctx, "%s: unable to unmarshal descriptor %v", kv.Key, kv.Value) - return - } - switch union := descriptor.Union.(type) { - case *sqlbase.Descriptor_Table: - table := union.Table - // Note that we don't need to "fill in" the descriptor here. Nobody - // actually reads the table, but it's necessary for the call to - // ValidateTable(). - if err := table.MaybeFillInDescriptor(ctx, nil, m.codec); err != nil { - log.Warningf(ctx, "%s: unable to fill in table descriptor %v", kv.Key, table) - return - } - if err := table.ValidateTable(); err != nil { - log.Errorf(ctx, "%s: received invalid table descriptor: %s. Desc: %v", - kv.Key, err, table, - ) - return - } - if log.V(2) { - log.Infof(ctx, "%s: refreshing lease table: %d (%s), version: %d, dropped: %t", - kv.Key, table.ID, table.Name, table.Version, table.Dropped()) - } - // Try to refresh the table lease to one >= this version. - if err := purgeOldVersions( - ctx, db, table.ID, table.GoingOffline(), table.Version, m); err != nil { - log.Warningf(ctx, "error purging leases for table %d(%s): %s", - table.ID, table.Name, err) - } - case *sqlbase.Descriptor_Database: - // Ignore. - } - }) - if m.testingKnobs.TestingLeasesRefreshedEvent != nil { - m.testingKnobs.TestingLeasesRefreshedEvent(cfg) + case <-s.ShouldQuiesce(): + return + } + } + }) +} + +func (m *LeaseManager) watchForGossipUpdates( + ctx context.Context, + s *stop.Stopper, + g gossip.DeprecatedGossip, + tableUpdateCh chan<- *sqlbase.TableDescriptor, +) { + if _, err := g.OptionalErr(47150); err != nil { + log.Fatalf(ctx, "required gossip until %v is active: %v", clusterversion.VersionRangefeedLeases, err) + } + + s.RunWorker(ctx, func(ctx context.Context) { + descKeyPrefix := m.codec.TablePrefix(uint32(sqlbase.DescriptorTable.ID)) + gossipUpdateC := g.DeprecatedRegisterSystemConfigChannel(47150) + filter := gossip.MakeSystemConfigDeltaFilter(descKeyPrefix) + + ctx, cancel := s.WithCancelOnQuiesce(ctx) + defer cancel() + for { + select { + case <-gossipUpdateC: + m.handleUpdatedSystemCfg(ctx, g, &filter, tableUpdateCh) + case <-s.ShouldQuiesce(): + return + } + } + }) +} + +func (m *LeaseManager) watchForRangefeedUpdates( + ctx context.Context, s *stop.Stopper, db *kv.DB, tableUpdateCh chan<- *sqlbase.TableDescriptor, +) { + // We're going to want to create a rangefeed. + + // We need to make sure that before we attempt to acquire a lease for a table + // that we register the need to track tables and their versions. Then we + // need to keep track of the states of the tables so that when a lease returns + // that we know whether it's still valid. + // + // This sort of race isn't necessarily handled by gossip but the lack of + // specificity in the gossip stream mitigated the importance of missing an + // event. + if log.V(1) { + log.Infof(ctx, "using rangefeeds for lease manager updates") + } + distSender := db.NonTransactionalSender().(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender) + eventCh := make(chan *roachpb.RangeFeedEvent) + ctx, _ = s.WithCancelOnQuiesce(ctx) + if err := s.RunAsyncTask(ctx, "lease rangefeed", func(ctx context.Context) { + for { + descKeyPrefix := keys.TODOSQLCodec.TablePrefix(uint32(sqlbase.DescriptorTable.ID)) + span := roachpb.Span{ + Key: descKeyPrefix, + EndKey: descKeyPrefix.PrefixEnd(), + } + // TODO(ajwerner): consider using withDiff to detect whether a version + // change occurred. + const withDiff = false + log.VEventf(ctx, 1, "starting rangefeed from %v on %v", m.initialTimestamp, span) + // TODO(ajwerner): track the resolved timestamp + err := distSender.RangeFeed(ctx, span, m.initialTimestamp, withDiff, eventCh) + if err != nil && ctx.Err() == nil { + log.Warningf(ctx, "lease rangefeed failed, restarting: %v", err) + } + if ctx.Err() != nil { + log.VEventf(ctx, 0, "exiting rangefeed") + return + } + } + }); err != nil { + // This will only fail if the stopper has been stopped. + return + } + handleEvent := func(ev *roachpb.RangeFeedValue) { + if len(ev.Value.RawBytes) == 0 { + return + } + var descriptor sqlbase.Descriptor + if err := ev.Value.GetProto(&descriptor); err != nil { + log.ReportOrPanic(ctx, &m.settings.SV, + "%s: unable to unmarshal descriptor %v", ev.Key, ev.Value) + return + } + table := descriptor.Table(ev.Value.Timestamp) + if table == nil { + return + } + + // Note that we don't need to "fill in" the descriptor here. Nobody + // actually reads the table, but it's necessary for the call to + // ValidateTable(). + if err := table.MaybeFillInDescriptor(ctx, nil, m.codec); err != nil { + log.Warningf(ctx, "%s: unable to fill in table descriptor %v", ev.Key, table) + return + } + if err := table.ValidateTable(); err != nil { + log.Errorf(ctx, "%s: received invalid table descriptor: %s. Desc: %v", + ev.Key, err, table, + ) + return + } + if log.V(2) { + log.Infof(ctx, "%s: refreshing lease table: %d (%s), version: %d, dropped: %t", + ev.Key, table.ID, table.Name, table.Version, table.Dropped()) + } + select { + case <-ctx.Done(): + case tableUpdateCh <- table: + } + return + } + s.RunWorker(ctx, func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case e := <-eventCh: + if e.Checkpoint != nil { + log.Infof(ctx, "got checkpoint %v", e.Checkpoint) + continue + } + if e.Error != nil { + log.Warningf(ctx, "got an error from a rangefeed: %v", e.Error.Error) + continue } + if e.Val != nil { + handleEvent(e.Val) + } + } + } + }) +} + +func (m *LeaseManager) handleUpdatedSystemCfg( + ctx context.Context, + g gossip.DeprecatedGossip, + cfgFilter *gossip.SystemConfigDeltaFilter, + tableUpdateChan chan<- *sqlbase.TableDescriptor, +) { + cfg := g.DeprecatedSystemConfig(47150) + // Read all tables and their versions + if log.V(2) { + log.Info(ctx, "received a new config; will refresh leases") + } - case <-s.ShouldStop(): + cfgFilter.ForModified(cfg, func(kv roachpb.KeyValue) { + // Attempt to unmarshal config into a table/database descriptor. + var descriptor sqlbase.Descriptor + if err := kv.Value.GetProto(&descriptor); err != nil { + log.Warningf(ctx, "%s: unable to unmarshal descriptor %v", kv.Key, kv.Value) + return + } + switch union := descriptor.Union.(type) { + case *sqlbase.Descriptor_Table: + table := union.Table + // Note that we don't need to "fill in" the descriptor here. Nobody + // actually reads the table, but it's necessary for the call to + // ValidateTable(). + if err := table.MaybeFillInDescriptor(ctx, nil, m.codec); err != nil { + log.Warningf(ctx, "%s: unable to fill in table descriptor %v", kv.Key, table) return } + if err := table.ValidateTable(); err != nil { + log.Errorf(ctx, "%s: received invalid table descriptor: %s. Desc: %v", + kv.Key, err, table, + ) + return + } + if log.V(2) { + log.Infof(ctx, "%s: refreshing lease table: %d (%s), version: %d, dropped: %t", + kv.Key, table.ID, table.Name, table.Version, table.Dropped()) + } + select { + case <-ctx.Done(): + case tableUpdateChan <- table: + } + + case *sqlbase.Descriptor_Database: + // Ignore. } }) + + // Attempt to shove a nil table descriptor into the channel to ensure that + // we've processed all of the events previously sent. + select { + case <-ctx.Done(): + // If we've been canceled, the other size of the channel will also have + // been canceled. + case tableUpdateChan <- nil: + } } // tableLeaseRefreshLimit is the upper-limit on the number of table leases diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 3ad8530b1426..3e5f0fc7d9e4 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -20,7 +20,6 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -123,7 +122,7 @@ func TestPurgeOldVersions(t *testing.T) { serverParams := base.TestServerArgs{ Knobs: base.TestingKnobs{ SQLLeaseManager: &LeaseManagerTestingKnobs{ - GossipUpdateEvent: func(cfg *config.SystemConfig) error { + TestingTableUpdateEvent: func(t *sqlbase.TableDescriptor) error { gossipSem <- struct{}{} <-gossipSem return nil diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 8441afeb863d..946bc31c5e23 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -23,7 +23,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -569,20 +569,6 @@ CREATE TABLE test.t(a INT PRIMARY KEY); } } -func isDeleted(tableID sqlbase.ID, cfg *config.SystemConfig) bool { - descKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableID) - val := cfg.GetValue(descKey) - if val == nil { - return false - } - var descriptor sqlbase.Descriptor - if err := val.GetProto(&descriptor); err != nil { - panic("unable to unmarshal table descriptor") - } - table := descriptor.Table(val.Timestamp) - return table.Dropped() -} - func acquire( ctx context.Context, s *server.TestServer, descID sqlbase.ID, ) (*sqlbase.ImmutableTableDescriptor, hlc.Timestamp, error) { @@ -604,14 +590,15 @@ func TestLeasesOnDeletedTableAreReleasedImmediately(t *testing.T) { params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ SQLLeaseManager: &sql.LeaseManagerTestingKnobs{ - TestingLeasesRefreshedEvent: func(cfg *config.SystemConfig) { + TestingTableRefreshedEvent: func(table *sqlbase.TableDescriptor) { mu.Lock() defer mu.Unlock() - if waitTableID != 0 { - if isDeleted(waitTableID, cfg) { - close(deleted) - waitTableID = 0 - } + if waitTableID != table.ID { + return + } + if table.Dropped() { + close(deleted) + waitTableID = 0 } }, }, @@ -1722,7 +1709,7 @@ func TestLeaseRenewedPeriodically(testingT *testing.T) { atomic.AddInt32(&testAcquisitionBlockCount, 1) }, }, - GossipUpdateEvent: func(cfg *config.SystemConfig) error { + TestingTableUpdateEvent: func(_ *sqlbase.TableDescriptor) error { return errors.Errorf("ignore gossip update") }, }, @@ -2158,3 +2145,94 @@ func TestIntentOnSystemConfigDoesNotPreventSchemaChange(t *testing.T) { require.NoError(t, err) } } + +// TestIntentOnSystemConfigDoesNotPreventSchemaChange tests that failures to +// gossip the system config due to intents are rectified when later intents +// are aborted. +func TestIntentOnSystemConfigDoesNotPreventSchemaChange2(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLLeaseManager: &sql.LeaseManagerTestingKnobs{ + AlwaysUseRangefeeds: true, + }, + Server: &server.TestingKnobs{ + // TODO(ajwerner): A + + BootstrapVersionOverride: clusterversion.TestingBinaryVersion, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + db := tc.ServerConn(0) + tdb := sqlutils.MakeSQLRunner(db) + tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';") + tdb.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true;") + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + time.Sleep(time.Second) + + log.Infof(ctx, "created table at %v", tc.Server(0).Clock().Now()) + connA, err := db.Conn(ctx) + require.NoError(t, err) + connB, err := db.Conn(ctx) + require.NoError(t, err) + + txA, err := connA.BeginTx(ctx, &gosql.TxOptions{}) + require.NoError(t, err) + txB, err := connB.BeginTx(ctx, &gosql.TxOptions{}) + require.NoError(t, err) + + // Lay down an intent on the system config span. + _, err = txA.Exec("CREATE TABLE bar (i INT PRIMARY KEY)") + require.NoError(t, err) + + _, err = txB.Exec("ALTER TABLE foo ADD COLUMN j INT NOT NULL DEFAULT 2") + require.NoError(t, err) + + getFooVersion := func() (version int) { + tx, err := db.Begin() + require.NoError(t, err) + // Prevent this transaction from blocking on intents. + _, err = tx.Exec("SET TRANSACTION PRIORITY HIGH") + require.NoError(t, err) + require.NoError(t, tx.QueryRow( + "SELECT version FROM crdb_internal.tables WHERE name = 'foo'"). + Scan(&version)) + require.NoError(t, tx.Commit()) + return version + } + + // Fire off the commit. In order to return, the table descriptor will need + // to make it through several versions. We wait until the version has been + // incremented once before we rollback txA. + origVersion := getFooVersion() + errCh := make(chan error) + go func() { + errCh <- txB.Commit() + log.Infof(ctx, "committed txn at %v", tc.Server(0).Clock().Now()) + }() + testutils.SucceedsSoon(t, func() error { + if got := getFooVersion(); got < origVersion { + return fmt.Errorf("got origVersion %d, expected greater", got) + } + return nil + }) + + // Roll back txA which had left an intent on the system config span which + // prevented the leaseholders of origVersion of foo from being notified. + // Ensure that those leaseholders are notified in a timely manner. + require.NoError(t, txA.Rollback()) + + const extremelyLongTime = 100 * time.Second + select { + case <-time.After(extremelyLongTime): + t.Fatalf("schema change did not complete in %v", extremelyLongTime) + case err := <-errCh: + require.NoError(t, err) + } +} diff --git a/pkg/sql/rename_test.go b/pkg/sql/rename_test.go index 68c4895dacdb..c0a0add2bcf8 100644 --- a/pkg/sql/rename_test.go +++ b/pkg/sql/rename_test.go @@ -16,7 +16,6 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -88,27 +87,6 @@ func TestRenameTable(t *testing.T) { } } -// isRenamed tests if a descriptor is updated by gossip to the specified name -// and version. -func isRenamed( - tableID sqlbase.ID, - expectedName string, - expectedVersion sqlbase.DescriptorVersion, - cfg *config.SystemConfig, -) bool { - descKey := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, tableID) - val := cfg.GetValue(descKey) - if val == nil { - return false - } - var descriptor sqlbase.Descriptor - if err := val.GetProto(&descriptor); err != nil { - panic("unable to unmarshal table descriptor") - } - table := descriptor.Table(val.Timestamp) - return table.Name == expectedName && table.Version == expectedVersion -} - // Test that a SQL txn that resolved a name can keep resolving that name during // its lifetime even after the table has been renamed. // Also tests that the name of a renamed table cannot be reused until everybody @@ -142,15 +120,16 @@ func TestTxnCanStillResolveOldName(t *testing.T) { // leases using the old name (an update with the new name but the original // version is ignored by the leasing refresh mechanism). renamed := make(chan interface{}) - lmKnobs.TestingLeasesRefreshedEvent = - func(cfg *config.SystemConfig) { + lmKnobs.TestingTableRefreshedEvent = + func(table *sqlbase.TableDescriptor) { mu.Lock() defer mu.Unlock() - if waitTableID != 0 { - if isRenamed(waitTableID, "t2", 2, cfg) { - close(renamed) - waitTableID = 0 - } + if waitTableID != table.ID { + return + } + if table.Name == "t2" && table.Version == 2 { + close(renamed) + waitTableID = 0 } } s, db, kvDB := serverutils.StartServer(t, serverParams) From ce53846c83804f5cd9aacc2bb155991db613c42f Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 28 Apr 2020 10:28:37 -0400 Subject: [PATCH 07/10] kvserver: add TestingRangefeedFilter testing knob This knob is handy for injecting and observing rangefeed behavior. Release note: None --- pkg/kv/kvserver/storagebase/base.go | 6 ++++++ pkg/kv/kvserver/store.go | 7 +++++++ pkg/kv/kvserver/testing_knobs.go | 5 +++++ 3 files changed, 18 insertions(+) diff --git a/pkg/kv/kvserver/storagebase/base.go b/pkg/kv/kvserver/storagebase/base.go index 586d00fad8e5..7b57475e8b58 100644 --- a/pkg/kv/kvserver/storagebase/base.go +++ b/pkg/kv/kvserver/storagebase/base.go @@ -101,6 +101,12 @@ type ReplicaApplyFilter func(args ApplyFilterArgs) (int, *roachpb.Error) // been processed. This filter is invoked only by the command proposer. type ReplicaResponseFilter func(context.Context, roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error +// ReplicaRangefeedFilter is used in unit tests to modify the request, inject +// responses, or return errors from rangefeeds. +type ReplicaRangefeedFilter func( + args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, +) *roachpb.Error + // ContainsKey returns whether this range contains the specified key. func ContainsKey(desc *roachpb.RangeDescriptor, key roachpb.Key) bool { if bytes.HasPrefix(key, keys.LocalRangeIDPrefix) { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 02ad41479fad..4a7d9c7203e5 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2299,6 +2299,13 @@ func (s *Store) Descriptor(useCached bool) (*roachpb.StoreDescriptor, error) { func (s *Store) RangeFeed( args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer, ) *roachpb.Error { + + if filter := s.TestingKnobs().TestingRangefeedFilter; filter != nil { + if pErr := filter(args, stream); pErr != nil { + return pErr + } + } + if err := verifyKeys(args.Span.Key, args.Span.EndKey, true); err != nil { return roachpb.NewError(err) } diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 900c9307a71f..9e3b40e0734b 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -62,6 +62,11 @@ type StoreTestingKnobs struct { // error returned to the client, or to simulate network failures. TestingResponseFilter storagebase.ReplicaResponseFilter + // TestingRangefeedFilter is called before a replica processes a rangefeed + // in order for unit tests to modify the request, error returned to the client + // or data. + TestingRangefeedFilter storagebase.ReplicaRangefeedFilter + // A hack to manipulate the clock before sending a batch request to a replica. // TODO(kaneda): This hook is not encouraged to use. Get rid of it once // we make TestServer take a ManualClock. From b2dfb7f8f8363277fa55e98e0529c8f8dedcae95 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 29 Apr 2020 10:53:09 -0400 Subject: [PATCH 08/10] sql: ensure leases acquired concurrent with updates are handled properly This commit additionally has a bunch of cleanup of the previous commit. Release note: None --- pkg/server/server_sql.go | 5 +- pkg/sql/lease.go | 263 +++++++++++++++++++++++++++------------ pkg/sql/lease_test.go | 238 +++++++++++++++++++++++++---------- 3 files changed, 361 insertions(+), 145 deletions(-) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index ae19a1a33e6a..6476a16b00c1 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -590,8 +590,9 @@ func (s *sqlServer) start( mmKnobs = *migrationManagerTestingKnobs.(*sqlmigrations.MigrationManagerTestingKnobs) } - s.leaseMgr.RefreshLeases(stopper, s.execCfg.DB, s.execCfg.Gossip) - s.leaseMgr.PeriodicallyRefreshSomeLeases() + s.leaseMgr.RefreshLeases(ctx, stopper, s.execCfg.DB, s.execCfg.Gossip) + s.leaseMgr.PeriodicallyRefreshSomeLeases(ctx) + migrationsExecutor := sql.MakeInternalExecutor( ctx, s.pgServer.SQLServer, s.internalMemMetrics, s.execCfg.Settings) migrationsExecutor.SetSessionData( diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 810242300f8f..65935ff01f19 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -723,6 +723,13 @@ type tableState struct { // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. dropped bool + + // acquisitionsInProgress indicates that at least one caller is currently + // in the process of performing an acquisition. This tracking is critical + // to ensure that notifications of new versions which arrive before a lease + // acquisition finishes but indicate that that new lease is expired are not + // ignored. + acquisitionsInProgress int } } @@ -891,9 +898,6 @@ func (m *LeaseManager) insertTableVersions(tableID sqlbase.ID, versions []*table // inserts it into the active set. It guarantees that the lease returned is // the one acquired after the call is made. Use this if the lease we want to // get needs to see some descriptor updates that we know happened recently. -// -// TODO(ajwerner): This really just needs to acquire a lease on the ID newer -// than something. Not necessarily newer than func (m *LeaseManager) AcquireFreshestFromStore(ctx context.Context, tableID sqlbase.ID) error { // Create tableState if needed. _ = m.findTableState(tableID, true /* create */) @@ -941,7 +945,7 @@ func (m *LeaseManager) AcquireFreshestFromStore(ctx context.Context, tableID sql // it and returns it. func (t *tableState) upsertLocked( ctx context.Context, table *tableVersionState, -) (*storedTableLease, error) { +) (_ *storedTableLease, _ error) { s := t.mu.active.find(table.Version) if s == nil { if t.mu.active.findNewest() != nil { @@ -1140,7 +1144,7 @@ func purgeOldVersions( return nil } t.mu.Lock() - empty := len(t.mu.active.data) == 0 + empty := len(t.mu.active.data) == 0 && t.mu.acquisitionsInProgress == 0 t.mu.Unlock() if empty { // We don't currently have a version on this table, so no need to refresh @@ -1228,6 +1232,20 @@ func (t *tableState) startLeaseRenewal( atomic.StoreInt32(&t.renewalInProgress, 0) } +// markAcquisitionStart increments the acquisitionsInProgress counter. +func (t *tableState) markAcquisitionStart(ctx context.Context) { + t.mu.Lock() + defer t.mu.Unlock() + t.mu.acquisitionsInProgress++ +} + +// markAcquisitionDone decrements the acquisitionsInProgress counter. +func (t *tableState) markAcquisitionDone(ctx context.Context) { + t.mu.Lock() + defer t.mu.Unlock() + t.mu.acquisitionsInProgress-- +} + // LeaseAcquireBlockType is the type of blocking result event when // calling LeaseAcquireResultBlockEvent. type LeaseAcquireBlockType int @@ -1279,6 +1297,13 @@ type LeaseManagerTestingKnobs struct { // detect changes to table descriptors. AlwaysUseRangefeeds bool + // VersionPollIntervalForRangefeeds controls the polling interval for the + // check whether the requisite version for rangefeed-based notifications has + // been finalized. + // + // TODO(ajwerner): Remove this and replace it with a callback. + VersionPollIntervalForRangefeeds time.Duration + LeaseStoreTestingKnobs LeaseStoreTestingKnobs } @@ -1412,6 +1437,10 @@ type LeaseManager struct { mu struct { syncutil.Mutex tables map[sqlbase.ID]*tableState + + // updatesResolvedTimestamp keeps track of a timestamp before which all + // table updates have already been seen. + updatesResolvedTimestamp hlc.Timestamp } draining atomic.Value @@ -1425,8 +1454,6 @@ type LeaseManager struct { ambientCtx log.AmbientContext stopper *stop.Stopper sem *quotapool.IntPool - - initialTimestamp hlc.Timestamp } const leaseConcurrencyLimit = 5 @@ -1467,13 +1494,13 @@ func NewLeaseManager( tableNames: tableNameCache{ tables: make(map[tableNameCacheKey]*tableVersionState), }, - ambientCtx: ambientCtx, - stopper: stopper, - sem: quotapool.NewIntPool("lease manager", leaseConcurrencyLimit), - initialTimestamp: clock.Now(), + ambientCtx: ambientCtx, + stopper: stopper, + sem: quotapool.NewIntPool("lease manager", leaseConcurrencyLimit), } lm.stopper.AddCloser(lm.sem.Closer("stopper")) lm.mu.tables = make(map[sqlbase.ID]*tableState) + lm.mu.updatesResolvedTimestamp = db.Clock().Now() lm.draining.Store(false) return lm @@ -1686,10 +1713,16 @@ func (m *LeaseManager) Acquire( } switch { case errors.Is(err, errRenewLease): - // Renew lease and retry. This will block until the lease is acquired. - if _, errLease := acquireNodeLease(ctx, m, tableID); errLease != nil { - return nil, hlc.Timestamp{}, errLease + if err := func() error { + t.markAcquisitionStart(ctx) + defer t.markAcquisitionDone(ctx) + // Renew lease and retry. This will block until the lease is acquired. + _, errLease := acquireNodeLease(ctx, m, tableID) + return errLease + }(); err != nil { + return nil, hlc.Timestamp{}, err } + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireBlock) } @@ -1786,47 +1819,21 @@ func (m *LeaseManager) findTableState(tableID sqlbase.ID, create bool) *tableSta // RefreshLeases starts a goroutine that refreshes the lease manager // leases for tables received in the latest system configuration via gossip or -// rangefeeds. This function must be passed a non-nil gossip if Version -func (m *LeaseManager) RefreshLeases(s *stop.Stopper, db *kv.DB, g gossip.DeprecatedGossip) { - - // Okay so we want to create a rangefeed and use that. - m.initialTimestamp = db.Clock().Now() - // Do we need to use the gossip at all? Seems like nah. Oh we need to because - // we don't know if rangefeeds are enabled. Grr. I guess the deal is that when - // it gets enabled then we flip the switch? We don't want to force the nodes - // to restart to get the goodness. We also want to be able to stop gossipping. - - // We're going to kick off a function to do the updating based on the gossip - // and then we'll also do one based on the rangefeeds and they'll synchronize. - // - // TODO(ajwerner): Fix this up, start out by assuming we're going to rely on - // gossip, depending on the minimum accepted version, then switch based on - // the finalization from the cluster setting or if the gossip is nil. - // - // We want an upgrade path such that after you finalize the 20.2 upgrade, the - // descriptor table is no longer gossipped. - - s.RunWorker(context.TODO(), func(ctx context.Context) { - for r := retry.Start(retry.Options{}); r.Next(); { - if m.settings.Version.ActiveVersionOrEmpty(ctx) != (clusterversion.ClusterVersion{}) { - break - } - } - m.refreshLeases(s, g, db) +// rangefeeds. This function must be passed a non-nil gossip if +// VersionRangefeedLeases is not active. +func (m *LeaseManager) RefreshLeases( + ctx context.Context, s *stop.Stopper, db *kv.DB, g gossip.DeprecatedGossip, +) { + s.RunWorker(ctx, func(ctx context.Context) { + m.refreshLeases(ctx, g, db, s) }) } -func (m *LeaseManager) refreshLeases(s *stop.Stopper, g gossip.DeprecatedGossip, db *kv.DB) { - ctx := context.TODO() +func (m *LeaseManager) refreshLeases( + ctx context.Context, g gossip.DeprecatedGossip, db *kv.DB, s *stop.Stopper, +) { tableUpdateCh := make(chan *sqlbase.TableDescriptor) - ver := m.settings.Version.ActiveVersion(ctx) - fmt.Println(ver) - if m.testingKnobs.AlwaysUseRangefeeds || m.settings.Version.IsActive(ctx, clusterversion.VersionRangefeedLeases) { - m.watchForRangefeedUpdates(ctx, s, db, tableUpdateCh) - } else { - m.watchForGossipUpdates(ctx, s, g, tableUpdateCh) - } - + m.watchForUpdates(ctx, s, db, g, tableUpdateCh) s.RunWorker(ctx, func(ctx context.Context) { for { select { @@ -1845,11 +1852,14 @@ func (m *LeaseManager) refreshLeases(s *stop.Stopper, g gossip.DeprecatedGossip, } // Try to refresh the table lease to one >= this version. + log.VEventf(ctx, 2, "purging old version of table %d@%d (offline %v)", + table.ID, table.Version, table.GoingOffline()) if err := purgeOldVersions( ctx, db, table.ID, table.GoingOffline(), table.Version, m); err != nil { log.Warningf(ctx, "error purging leases for table %d(%s): %s", table.ID, table.Name, err) } + if evFunc := m.testingKnobs.TestingTableRefreshedEvent; evFunc != nil { evFunc(table) } @@ -1861,6 +1871,53 @@ func (m *LeaseManager) refreshLeases(s *stop.Stopper, g gossip.DeprecatedGossip, }) } +// watchForUpdates will watch either gossip or rangefeeds for updates. If the +// version does not currently support rangefeeds, gossip will be used until +// rangefeeds are supported, at which time, the system will shut down the +// gossip listener and start using rangefeeds. +func (m *LeaseManager) watchForUpdates( + ctx context.Context, + s *stop.Stopper, + db *kv.DB, + g gossip.DeprecatedGossip, + tableUpdateCh chan *sqlbase.TableDescriptor, +) { + useRangefeeds := m.testingKnobs.AlwaysUseRangefeeds || + m.settings.Version.IsActive(ctx, clusterversion.VersionRangefeedLeases) + if useRangefeeds { + m.watchForRangefeedUpdates(ctx, s, db, tableUpdateCh) + return + } + gossipCtx, cancelWatchingGossip := context.WithCancel(ctx) + m.watchForGossipUpdates(gossipCtx, s, g, tableUpdateCh) + canUseRangefeedsCh := m.waitForRangefeedsToBeUsable(ctx, s) + if err := s.RunAsyncTask(ctx, "wait for upgrade", func(ctx context.Context) { + select { + case <-s.ShouldQuiesce(): + return + case <-canUseRangefeedsCh: + // Note: It's okay that the cancelation of gossip watching is + // asynchronous. At worst we'd get duplicate updates or stale updates. + // Both of those are handled. + cancelWatchingGossip() + // Note: It's safe to start watching for rangefeeds now. We know that all + // nodes support rangefeeds in the system config span. Even though there + // may not have been logical ops for all operations in the log, the + // catch-up scan should take us up to the present. + // + // When the rangefeed starts up we'll pass it an initial timestamp which + // is no newer than all updates to the system config span we've already + // seen (see setResolvedTimestamp and its callers). The rangefeed API + // ensures that we will see all updates from on or before that timestamp + // at least once. + m.watchForRangefeedUpdates(ctx, s, db, tableUpdateCh) + } + }); err != nil { + // Note: this can only happen if the stopper has been stopped. + return + } +} + func (m *LeaseManager) watchForGossipUpdates( ctx context.Context, s *stop.Stopper, @@ -1873,6 +1930,7 @@ func (m *LeaseManager) watchForGossipUpdates( s.RunWorker(ctx, func(ctx context.Context) { descKeyPrefix := m.codec.TablePrefix(uint32(sqlbase.DescriptorTable.ID)) + // TODO(ajwerner): Add a mechanism to unregister this channel upon return. gossipUpdateC := g.DeprecatedRegisterSystemConfigChannel(47150) filter := gossip.MakeSystemConfigDeltaFilter(descKeyPrefix) @@ -1892,16 +1950,6 @@ func (m *LeaseManager) watchForGossipUpdates( func (m *LeaseManager) watchForRangefeedUpdates( ctx context.Context, s *stop.Stopper, db *kv.DB, tableUpdateCh chan<- *sqlbase.TableDescriptor, ) { - // We're going to want to create a rangefeed. - - // We need to make sure that before we attempt to acquire a lease for a table - // that we register the need to track tables and their versions. Then we - // need to keep track of the states of the tables so that when a lease returns - // that we know whether it's still valid. - // - // This sort of race isn't necessarily handled by gossip but the lack of - // specificity in the gossip stream mitigated the importance of missing an - // event. if log.V(1) { log.Infof(ctx, "using rangefeeds for lease manager updates") } @@ -1910,22 +1958,22 @@ func (m *LeaseManager) watchForRangefeedUpdates( ctx, _ = s.WithCancelOnQuiesce(ctx) if err := s.RunAsyncTask(ctx, "lease rangefeed", func(ctx context.Context) { for { - descKeyPrefix := keys.TODOSQLCodec.TablePrefix(uint32(sqlbase.DescriptorTable.ID)) + ts := m.getResolvedTimestamp() + descKeyPrefix := m.codec.TablePrefix(uint32(sqlbase.DescriptorTable.ID)) span := roachpb.Span{ Key: descKeyPrefix, EndKey: descKeyPrefix.PrefixEnd(), } - // TODO(ajwerner): consider using withDiff to detect whether a version - // change occurred. + // Note: We don't need to use withDiff to detect version changes because + // the LeaseManager already stores the relevant version information. const withDiff = false - log.VEventf(ctx, 1, "starting rangefeed from %v on %v", m.initialTimestamp, span) - // TODO(ajwerner): track the resolved timestamp - err := distSender.RangeFeed(ctx, span, m.initialTimestamp, withDiff, eventCh) + log.VEventf(ctx, 1, "starting rangefeed from %v on %v", ts, span) + err := distSender.RangeFeed(ctx, span, ts, withDiff, eventCh) if err != nil && ctx.Err() == nil { log.Warningf(ctx, "lease rangefeed failed, restarting: %v", err) } if ctx.Err() != nil { - log.VEventf(ctx, 0, "exiting rangefeed") + log.VEventf(ctx, 1, "exiting rangefeed") return } } @@ -1952,13 +2000,14 @@ func (m *LeaseManager) watchForRangefeedUpdates( // actually reads the table, but it's necessary for the call to // ValidateTable(). if err := table.MaybeFillInDescriptor(ctx, nil, m.codec); err != nil { - log.Warningf(ctx, "%s: unable to fill in table descriptor %v", ev.Key, table) + log.ReportOrPanic(ctx, &m.settings.SV, + "%s: unable to fill in table descriptor %v", ev.Key, table) return } if err := table.ValidateTable(); err != nil { - log.Errorf(ctx, "%s: received invalid table descriptor: %s. Desc: %v", - ev.Key, err, table, - ) + // Note: we don't ReportOrPanic here because invalid descriptors are + // sometimes created during testing. + log.Errorf(ctx, "%s: received invalid table descriptor: %s. Desc: %v", ev.Key, err, table) return } if log.V(2) { @@ -1969,7 +2018,6 @@ func (m *LeaseManager) watchForRangefeedUpdates( case <-ctx.Done(): case tableUpdateCh <- table: } - return } s.RunWorker(ctx, func(ctx context.Context) { for { @@ -1978,7 +2026,8 @@ func (m *LeaseManager) watchForRangefeedUpdates( return case e := <-eventCh: if e.Checkpoint != nil { - log.Infof(ctx, "got checkpoint %v", e.Checkpoint) + log.VEventf(ctx, 2, "got rangefeed checkpoint %v", e.Checkpoint) + m.setResolvedTimestamp(e.Checkpoint.ResolvedTS) continue } if e.Error != nil { @@ -2004,10 +2053,13 @@ func (m *LeaseManager) handleUpdatedSystemCfg( if log.V(2) { log.Info(ctx, "received a new config; will refresh leases") } - + var latestTimestamp hlc.Timestamp cfgFilter.ForModified(cfg, func(kv roachpb.KeyValue) { // Attempt to unmarshal config into a table/database descriptor. var descriptor sqlbase.Descriptor + if latestTimestamp.Less(kv.Value.Timestamp) { + latestTimestamp = kv.Value.Timestamp + } if err := kv.Value.GetProto(&descriptor); err != nil { log.Warningf(ctx, "%s: unable to unmarshal descriptor %v", kv.Key, kv.Value) return @@ -2041,7 +2093,9 @@ func (m *LeaseManager) handleUpdatedSystemCfg( // Ignore. } }) - + if !latestTimestamp.IsEmpty() { + m.setResolvedTimestamp(latestTimestamp) + } // Attempt to shove a nil table descriptor into the channel to ensure that // we've processed all of the events previously sent. select { @@ -2052,6 +2106,59 @@ func (m *LeaseManager) handleUpdatedSystemCfg( } } +// waitForRangefeedsToBeUsable returns a channel which is closed when rangefeeds +// are usable according to the cluster version. +func (m *LeaseManager) waitForRangefeedsToBeUsable( + ctx context.Context, s *stop.Stopper, +) chan struct{} { + // TODO(ajwerner): Add a callback to notify about version changes. + // Checking is pretty cheap but really this should be a callback. + const defaultCheckInterval = 10 * time.Second + checkInterval := defaultCheckInterval + if m.testingKnobs.VersionPollIntervalForRangefeeds != 0 { + checkInterval = m.testingKnobs.VersionPollIntervalForRangefeeds + } + upgradeChan := make(chan struct{}) + timer := timeutil.NewTimer() + timer.Reset(0) + s.RunWorker(ctx, func(ctx context.Context) { + for { + select { + case <-timer.C: + timer.Read = true + if m.settings.Version.IsActive(ctx, clusterversion.VersionRangefeedLeases) { + close(upgradeChan) + return + } + timer.Reset(checkInterval) + case <-ctx.Done(): + return + case <-s.ShouldQuiesce(): + return + } + } + }) + return upgradeChan +} + +// setResolvedTimestamp marks the LeaseManager as having processed all updates +// up to this timestamp. It is set under the gossip path based on the highest +// timestamp seen in a system config and under the rangefeed path when a +// resolved timestamp is received. +func (m *LeaseManager) setResolvedTimestamp(ts hlc.Timestamp) { + m.mu.Lock() + defer m.mu.Unlock() + if m.mu.updatesResolvedTimestamp.Less(ts) { + m.mu.updatesResolvedTimestamp = ts + } +} + +func (m *LeaseManager) getResolvedTimestamp() hlc.Timestamp { + m.mu.Lock() + defer m.mu.Unlock() + return m.mu.updatesResolvedTimestamp +} + // tableLeaseRefreshLimit is the upper-limit on the number of table leases // that will continuously have their lease refreshed. var tableLeaseRefreshLimit = settings.RegisterIntSetting( @@ -2063,8 +2170,8 @@ var tableLeaseRefreshLimit = settings.RegisterIntSetting( // PeriodicallyRefreshSomeLeases so that leases are fresh and can serve // traffic immediately. // TODO(vivek): Remove once epoch based table leases are implemented. -func (m *LeaseManager) PeriodicallyRefreshSomeLeases() { - m.stopper.RunWorker(context.Background(), func(ctx context.Context) { +func (m *LeaseManager) PeriodicallyRefreshSomeLeases(ctx context.Context) { + m.stopper.RunWorker(ctx, func(ctx context.Context) { if m.leaseDuration <= 0 { return } diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 946bc31c5e23..29284984c4c6 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" @@ -43,6 +44,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/lib/pq" "github.com/stretchr/testify/require" ) @@ -216,7 +218,8 @@ func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager { t.server.Stopper(), t.cfg, ) - mgr.PeriodicallyRefreshSomeLeases() + ctx := logtags.AddTag(context.Background(), "leasemgr", nodeID) + mgr.PeriodicallyRefreshSomeLeases(ctx) t.nodes[nodeID] = mgr } return mgr @@ -701,7 +704,7 @@ func TestSubqueryLeases(t *testing.T) { // Note: we don't use close(fooRelease) here because the // lease on "foo" may be re-acquired (and re-released) // multiple times, at least once for the first - // CREATE/SELECT pair and one for the final DROP. + // CREATE/SELECT pair and one for the finalf DROP. atomic.AddInt32(&fooReleaseCount, 1) fooRelease <- struct{}{} } @@ -2146,93 +2149,198 @@ func TestIntentOnSystemConfigDoesNotPreventSchemaChange(t *testing.T) { } } -// TestIntentOnSystemConfigDoesNotPreventSchemaChange tests that failures to -// gossip the system config due to intents are rectified when later intents -// are aborted. -func TestIntentOnSystemConfigDoesNotPreventSchemaChange2(t *testing.T) { +// TestFinalizeVersionEnablesRangefeedUpdates ensures that gossip is used when +// the version is initialized to something prior to VersionRangefeedLeases and +// then that rangefeeds are adopted once that version is finalized. +// +// TODO(ajwerner): Remove this test in 21.1 as it is no longer relevant. +func TestFinalizeVersionEnablesRangefeedUpdates(t *testing.T) { defer leaktest.AfterTest(t)() + // The test first starts a cluster at a version below VersionRangefeedLeases + // and ensure that schema changes don't block for too long. Meanwhile ensure + // that no rangefeed has been created on the system config span. Then finalize + // the version upgrade and ensure that a rangefeed is created and that + // schema changes still work. + ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + var rangefeedsCreated int64 + descriptorTablePrefix := keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID) + descriptorTableSpan := roachpb.Span{ + Key: descriptorTablePrefix, + EndKey: descriptorTablePrefix.PrefixEnd(), + } + tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ SQLLeaseManager: &sql.LeaseManagerTestingKnobs{ - AlwaysUseRangefeeds: true, + VersionPollIntervalForRangefeeds: time.Millisecond, + }, + Store: &kvserver.StoreTestingKnobs{ + // Add a filter to detect the creation of a rangefeed over the + // descriptor table. + TestingRangefeedFilter: func( + args *roachpb.RangeFeedRequest, _ roachpb.Internal_RangeFeedServer, + ) *roachpb.Error { + if args.Span.Overlaps(descriptorTableSpan) { + atomic.AddInt64(&rangefeedsCreated, 1) + } + return nil + }, }, Server: &server.TestingKnobs{ - // TODO(ajwerner): A - - BootstrapVersionOverride: clusterversion.TestingBinaryVersion, + // We're going to manually control when the upgrade takes place below + // so disable the automatic upgrade. + DisableAutomaticVersionUpgrade: 1, + // Bootstrap the cluster at something below VersionRangefeedLeases so + // that we can test the upgrade. + BootstrapVersionOverride: clusterversion.VersionByKey(clusterversion.Version20_1), }, }, }, }) defer tc.Stopper().Stop(ctx) - db := tc.ServerConn(0) - tdb := sqlutils.MakeSQLRunner(db) - tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';") - tdb.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true;") + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") - time.Sleep(time.Second) + // Lease table foo on node 2. + db2 := tc.ServerConn(1) + var junk int + require.EqualValues(t, gosql.ErrNoRows, db2.QueryRow("SELECT * FROM foo").Scan(&junk)) - log.Infof(ctx, "created table at %v", tc.Server(0).Clock().Now()) - connA, err := db.Conn(ctx) - require.NoError(t, err) - connB, err := db.Conn(ctx) - require.NoError(t, err) + // Run a schema change which will require a notification to finish. + tdb.Exec(t, "ALTER TABLE foo ADD COLUMN j INT NOT NULL DEFAULT 2") + require.Equal(t, int64(0), atomic.LoadInt64(&rangefeedsCreated)) - txA, err := connA.BeginTx(ctx, &gosql.TxOptions{}) - require.NoError(t, err) - txB, err := connB.BeginTx(ctx, &gosql.TxOptions{}) - require.NoError(t, err) + // Upgrade to after VersionRangefeedLeases and ensure that a rangefeed is created. + tdb.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version();") + testutils.SucceedsSoon(t, func() error { + if atomic.LoadInt64(&rangefeedsCreated) == 0 { + return errors.New("no rangefeeds created") + } + return nil + }) + tdb.Exec(t, "ALTER TABLE foo ADD COLUMN k INT NOT NULL DEFAULT 2") +} - // Lay down an intent on the system config span. - _, err = txA.Exec("CREATE TABLE bar (i INT PRIMARY KEY)") - require.NoError(t, err) +func ensureTestTakesLessThan(t *testing.T, allowed time.Duration) func() { + start := timeutil.Now() + return func() { + if t.Failed() { + return + } + if took := timeutil.Since(start); took > allowed { + t.Fatalf("test took %v which is greater than %v", took, allowed) + } + } +} - _, err = txB.Exec("ALTER TABLE foo ADD COLUMN j INT NOT NULL DEFAULT 2") - require.NoError(t, err) +// TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces deals with the case where +// we have a request to lease a table descriptor, we read version 1 and prior to +// adding that to the state, we get an update indicating that that version is +// too old. +func TestRangefeedUpdatesHandledProperlyInTheFaceOfRaces(t *testing.T) { + defer leaktest.AfterTest(t)() + defer ensureTestTakesLessThan(t, 30*time.Second)() - getFooVersion := func() (version int) { - tx, err := db.Begin() - require.NoError(t, err) - // Prevent this transaction from blocking on intents. - _, err = tx.Exec("SET TRANSACTION PRIORITY HIGH") - require.NoError(t, err) - require.NoError(t, tx.QueryRow( - "SELECT version FROM crdb_internal.tables WHERE name = 'foo'"). - Scan(&version)) - require.NoError(t, tx.Commit()) - return version + ctx := context.Background() + var interestingTable atomic.Value + interestingTable.Store(sqlbase.ID(0)) + blockLeaseAcquisitionOfInterestingTable := make(chan chan struct{}) + unblockAll := make(chan struct{}) + args := base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Server: &server.TestingKnobs{ + BootstrapVersionOverride: clusterversion.VersionByKey(clusterversion.VersionRangefeedLeases), + }, + }, } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: args, + }) + tableUpdateChan := make(chan *sqlbase.TableDescriptor) + args.Knobs.SQLLeaseManager = &sql.LeaseManagerTestingKnobs{ + TestingTableUpdateEvent: func(table *sqlbase.TableDescriptor) error { + // Use this testing knob to ensure that we see an update for the table + // in question. We don't care about events to refresh the first version + // which can happen under rare stress scenarios. + if table.ID == interestingTable.Load().(sqlbase.ID) && table.Version >= 2 { + select { + case tableUpdateChan <- table: + case <-unblockAll: + } + } + return nil + }, + LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{ + LeaseAcquiredEvent: func(table sqlbase.TableDescriptor, err error) { + // Block the lease acquisition for the table after the leasing + // transaction has been issued. We'll wait to unblock it until after + // the new version has been published and that even has been received. + if table.ID != interestingTable.Load().(sqlbase.ID) { + return + } + blocked := make(chan struct{}) + select { + case blockLeaseAcquisitionOfInterestingTable <- blocked: + <-blocked + case <-unblockAll: + } + }, + }, + } + // Start a second server with our knobs. + tc.AddServer(t, args) + defer tc.Stopper().Stop(ctx) - // Fire off the commit. In order to return, the table descriptor will need - // to make it through several versions. We wait until the version has been - // incremented once before we rollback txA. - origVersion := getFooVersion() - errCh := make(chan error) + db1 := tc.ServerConn(0) + tdb1 := sqlutils.MakeSQLRunner(db1) + db2 := tc.ServerConn(1) + + // Create a couple of tables. + tdb1.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + + // Find the table ID for the table we'll be mucking with. + var tableID sqlbase.ID + tdb1.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name = $1 AND database_name = current_database()", + "foo").Scan(&tableID) + interestingTable.Store(tableID) + + // Launch a goroutine to query foo. It will be blocked in lease acquisition. + selectDone := make(chan error) go func() { - errCh <- txB.Commit() - log.Infof(ctx, "committed txn at %v", tc.Server(0).Clock().Now()) + var count int + selectDone <- db2.QueryRow("SELECT count(*) FROM foo").Scan(&count) }() - testutils.SucceedsSoon(t, func() error { - if got := getFooVersion(); got < origVersion { - return fmt.Errorf("got origVersion %d, expected greater", got) - } - return nil - }) - // Roll back txA which had left an intent on the system config span which - // prevented the leaseholders of origVersion of foo from being notified. - // Ensure that those leaseholders are notified in a timely manner. - require.NoError(t, txA.Rollback()) + // Make sure it got blocked. + toUnblockForLeaseAcquisition := <-blockLeaseAcquisitionOfInterestingTable - const extremelyLongTime = 100 * time.Second - select { - case <-time.After(extremelyLongTime): - t.Fatalf("schema change did not complete in %v", extremelyLongTime) - case err := <-errCh: - require.NoError(t, err) - } + // Launch a goroutine to perform a schema change which will lead to new + // versions. + alterErrCh := make(chan error) + go func() { + _, err := db1.Exec("ALTER TABLE foo ADD COLUMN j INT DEFAULT 1") + alterErrCh <- err + }() + + // Make sure we get an update. Note that this is after we have already + // acquired a lease on the old version but have not yet recorded that fact. + table := <-tableUpdateChan + require.Equal(t, sqlbase.DescriptorVersion(2), table.Version) + + // Allow the original lease acquisition to proceed. + close(toUnblockForLeaseAcquisition) + + // Ensure the query completes. + <-selectDone + + // Allow everything to proceed as usual. + close(unblockAll) + // Ensure the schema change completes. + <-alterErrCh + + // Ensure that the new schema is in use on n2. + var i, j int + require.Equal(t, gosql.ErrNoRows, db2.QueryRow("SELECT i, j FROM foo").Scan(&i, &j)) } From 9bf0ceb751c5d85b295d6940e6158b808f3f0868 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 30 Apr 2020 18:49:35 -0400 Subject: [PATCH 09/10] server,kvserver: use real Descriptor value in test The test would hit an assertion in the leasemanager otherwise. Release note: None --- pkg/kv/kvserver/gossip_test.go | 5 +++-- pkg/server/server_test.go | 25 +++++++++++++++++-------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/pkg/kv/kvserver/gossip_test.go b/pkg/kv/kvserver/gossip_test.go index 46bc52b38538..b7a52451a32c 100644 --- a/pkg/kv/kvserver/gossip_test.go +++ b/pkg/kv/kvserver/gossip_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -211,10 +212,10 @@ func TestGossipAfterAbortOfSystemConfigTransactionAfterFailureDueToIntents(t *te txB := db.NewTxn(ctx, "b") require.NoError(t, txA.SetSystemConfigTrigger()) - require.NoError(t, txA.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(1000), "foo")) + require.NoError(t, txA.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(1000), &sqlbase.Descriptor{})) require.NoError(t, txB.SetSystemConfigTrigger()) - require.NoError(t, txB.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(2000), "bar")) + require.NoError(t, txB.Put(ctx, keys.SystemSQLCodec.DescMetadataKey(2000), &sqlbase.Descriptor{})) const someTime = 10 * time.Millisecond clearNotifictions := func(ch <-chan struct{}) { diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index d6d62044fcf8..e0327a6c34be 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -540,14 +540,17 @@ func TestMultiRangeScanWithPagination(t *testing.T) { func TestSystemConfigGossip(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(context.TODO()) + defer s.Stopper().Stop(ctx) ts := s.(*TestServer) - ctx := context.TODO() key := sqlbase.MakeDescMetadataKey(keys.SystemSQLCodec, keys.MaxReservedDescID) - valAt := func(i int) *sqlbase.DatabaseDescriptor { - return &sqlbase.DatabaseDescriptor{Name: "foo", ID: sqlbase.ID(i)} + valAt := func(i int) *sqlbase.Descriptor { + return sqlbase.WrapDescriptor(&sqlbase.DatabaseDescriptor{ + ID: sqlbase.ID(i), + Name: "foo", + }) } // Register a callback for gossip updates. @@ -598,12 +601,18 @@ func TestSystemConfigGossip(t *testing.T) { } // Make sure the returned value is valAt(2). - got := new(sqlbase.DatabaseDescriptor) - if err := val.GetProto(got); err != nil { + var got sqlbase.Descriptor + if err := val.GetProto(&got); err != nil { return err } - if expected := valAt(2); !reflect.DeepEqual(got, expected) { - return errors.Errorf("mismatch: expected %+v, got %+v", *expected, *got) + + expected := valAt(2).GetDatabase() + db := got.GetDatabase() + if db == nil { + panic(errors.Errorf("found nil database: %v", got)) + } + if !reflect.DeepEqual(*db, *expected) { + panic(errors.Errorf("mismatch: expected %+v, got %+v", *expected, *db)) } return nil }) From b48d26f4fe274be0ea45624a0f599522eb7ba1f6 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Fri, 24 Apr 2020 14:34:02 -0400 Subject: [PATCH 10/10] roachpb: rework replicas.CanMakeProgress() CanMakeProgress was delegating to Raft, but also had a local fast-path. The fast-path was broken: it was considering a configuration with two voter-fulls and a voter-incoming, and one of the fulls dead, to be able to make progress. That was false; the configuration doesn't have quorum for the outgoing config. This patch fixes the bug, and also removes the dependency on the Raft code. The way in which we were using the Raft library was quite inefficient, and I want to start using this function in the replication report, which will call it a lot. For purposes of deciding under-replication, that report already needs to understand the different replica states instead of delegating everything to Raft. So let's embrace this responsibility more fully. Release note: None --- pkg/roachpb/metadata_replicas.go | 69 ++++++++++---------- pkg/roachpb/metadata_replicas_test.go | 90 ++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 38 deletions(-) diff --git a/pkg/roachpb/metadata_replicas.go b/pkg/roachpb/metadata_replicas.go index 9cf413417acb..3e0a8b16f3c5 100644 --- a/pkg/roachpb/metadata_replicas.go +++ b/pkg/roachpb/metadata_replicas.go @@ -14,10 +14,7 @@ import ( "fmt" "strings" - "go.etcd.io/etcd/raft/confchange" - "go.etcd.io/etcd/raft/quorum" "go.etcd.io/etcd/raft/raftpb" - "go.etcd.io/etcd/raft/tracker" ) // ReplicaTypeVoterFull returns a VOTER_FULL pointer suitable for use in a @@ -331,41 +328,43 @@ func (d ReplicaDescriptors) ConfState() raftpb.ConfState { // replication layer. This is more complicated than just counting the number // of replicas due to the existence of joint quorums. func (d ReplicaDescriptors) CanMakeProgress(liveFunc func(descriptor ReplicaDescriptor) bool) bool { - voters := d.Voters() - var c int - // Take the fast path when there are only "current and future" voters, i.e. - // no learners and no voters of type VOTER_OUTGOING. The config may be joint, - // but the outgoing conf is subsumed by the incoming one. - if n := len(d.wrapped); len(voters) == n { - for _, rDesc := range voters { - if liveFunc(rDesc) { - c++ - } + isVoterOldConfig := func(rDesc ReplicaDescriptor) bool { + switch rDesc.GetType() { + case VOTER_FULL, VOTER_OUTGOING, VOTER_DEMOTING: + return true + default: + return false } - return c >= n/2+1 } - - // Slow path. For simplicity, don't try to duplicate the logic that already - // exists in raft. - cfg, _, err := confchange.Restore( - confchange.Changer{Tracker: tracker.MakeProgressTracker(1)}, - d.ConfState(), - ) - if err != nil { - panic(err) + isVoterNewConfig := func(rDesc ReplicaDescriptor) bool { + switch rDesc.GetType() { + case VOTER_FULL, VOTER_INCOMING: + return true + default: + return false + } } - // Simulate a voting round in which the live replicas vote affirmatively and - // the dead replicas don't vote at all. We'll then check if the live votes are - // enough to commit commands (an outcome of VoteWon) or not (an outcome of - // VotePending). - votes := make(map[uint64]bool, len(d.wrapped)) - for _, rDesc := range d.wrapped { - // NB: it doesn't matter for the outcome whether we count unavailable - // replicas as abstaining or rejecting, as we're checking only for VoteWon - // below. - if liveFunc(rDesc) { - votes[uint64(rDesc.ReplicaID)] = true + // isBoth takes two replica predicates and returns their conjunction. + isBoth := func( + pred1 func(rDesc ReplicaDescriptor) bool, + pred2 func(rDesc ReplicaDescriptor) bool) func(ReplicaDescriptor) bool { + return func(rDesc ReplicaDescriptor) bool { + return pred1(rDesc) && pred2(rDesc) } } - return cfg.Voters.VoteResult(votes) == quorum.VoteWon + + votersOldGroup := d.Filter(isVoterOldConfig) + liveVotersOldGroup := d.Filter(isBoth(isVoterOldConfig, liveFunc)) + + n := len(votersOldGroup) + // Empty groups succeed by default, to match the Raft implementation. + if n > 0 && len(liveVotersOldGroup) < n/2+1 { + return false + } + + votersNewGroup := d.Filter(isVoterNewConfig) + liveVotersNewGroup := d.Filter(isBoth(isVoterNewConfig, liveFunc)) + + n = len(votersNewGroup) + return len(liveVotersNewGroup) >= n/2+1 } diff --git a/pkg/roachpb/metadata_replicas_test.go b/pkg/roachpb/metadata_replicas_test.go index 97cc8fcf745c..12dd01d95774 100644 --- a/pkg/roachpb/metadata_replicas_test.go +++ b/pkg/roachpb/metadata_replicas_test.go @@ -11,12 +11,21 @@ package roachpb import ( + "context" + "math/rand" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.etcd.io/etcd/raft" + "go.etcd.io/etcd/raft/confchange" + "go.etcd.io/etcd/raft/quorum" + "go.etcd.io/etcd/raft/tracker" ) func rd(typ *ReplicaType, id uint64) ReplicaDescriptor { @@ -232,13 +241,20 @@ func TestReplicaDescriptorsCanMakeProgress(t *testing.T) { {false, rd(v, 2)}, {true, rd(v, 3)}, }, true}, - // Two out of three voters alive, but one is an incoming voter. (This - // still uses the fast path). + // Two out of three voters alive, but one is an incoming voter. The outgoing + // group doesn't have quorum. {[]descWithLiveness{ {true, rd(v, 1)}, {false, rd(v, 2)}, {true, rd(vi, 3)}, - }, true}, + }, false}, + // Two out of three voters alive, but one is an outgoing voter. The incoming + // group doesn't have quorum. + {[]descWithLiveness{ + {true, rd(v, 1)}, + {false, rd(v, 2)}, + {true, rd(vd, 3)}, + }, false}, // Two out of three voters dead, and they're all incoming voters. (This // can't happen in practice because it means there were zero voters prior // to the conf change, but still this result is correct, similar to others @@ -304,3 +320,71 @@ func TestReplicaDescriptorsCanMakeProgress(t *testing.T) { }) } } + +// Test that ReplicaDescriptors.CanMakeProgress() agrees with the equivalent +// etcd/raft's code. We generate random configs and then see whether out +// determination for unavailability matches etcd/raft. +func TestReplicaDescriptorsCanMakeProgressRandom(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + randutil.SeedForTests() + + var progress, noProgress, skipped int + + start := timeutil.Now() + for timeutil.Now().Sub(start) < 100*time.Millisecond { + // Generate a random range configuration with between 1 and 7 replicas. + size := 1 + rand.Intn(6) + rds := make([]ReplicaDescriptor, size) + liveness := make([]bool, size) + // Generate a bunch of bits, each one representing the liveness of a different replica. + livenessBits := rand.Int31() + for i := range rds { + rds[i].ReplicaID = ReplicaID(i + 1) + typ := ReplicaType(rand.Intn(len(ReplicaType_name))) + rds[i].Type = &typ + liveness[i] = (livenessBits >> i & 1) == 0 + } + + rng := MakeReplicaDescriptors(rds) + + crdbCanMakeProgress := rng.CanMakeProgress(func(rd ReplicaDescriptor) bool { + return liveness[rd.ReplicaID-1] + }) + + raftCanMakeProgress, skip := func() (res bool, skip bool) { + cfg, _, err := confchange.Restore( + confchange.Changer{Tracker: tracker.MakeProgressTracker(1)}, + rng.ConfState(), + ) + if err != nil { + if err.Error() != "removed all voters" { + t.Fatal(err) + } + return false, true + } + votes := make(map[uint64]bool, len(rng.wrapped)) + for _, rDesc := range rng.wrapped { + if liveness[rDesc.ReplicaID-1] { + votes[uint64(rDesc.ReplicaID)] = true + } + } + return cfg.Voters.VoteResult(votes) == quorum.VoteWon, false + }() + + if skip { + // Going to an empty config, which is non-sensical. Skipping input. + skipped++ + continue + } + require.Equalf(t, raftCanMakeProgress, crdbCanMakeProgress, + "input: %s liveness: %v", rng, liveness) + if crdbCanMakeProgress { + progress++ + } else { + noProgress++ + } + } + log.Infof(ctx, "progress: %d cases. no progress: %d cases. skipped: %d cases.", + progress, noProgress, skipped) +}