From a95331aa8b35a371fad40c16ae03db71ef22fdef Mon Sep 17 00:00:00 2001 From: Vivek Menezes Date: Thu, 8 Jun 2017 20:58:10 -0400 Subject: [PATCH] sql: changed LeaseManager API to return/use table descriptors The Acquire/AcquireByName and Release() methods no longer return/use a LeaseState and instead return/use a sqlbase.TableDescriptor with an expiration time. the LeaseManager is moving towards a model where it returns a table descriptor with an expiration time for the use of the table descriptor. For now this table descriptor is associated with a lease, but in the future it might be an older version of the table descriptor, in which case it will not have a lease. With this change, leases on the same table descriptor version are renewed under the covers without needing the API to issue an explicit release on a table descriptor version. The refcount on the old lease is simply transferred to the new one. The new lease is guaranteed to have a higher expiration time and thus a prior returned expiration time returned by the API is guaranteed to be contained within the new lease expiration time. A Release() on the table descriptor will reduce the refcount on the current lease. In the future, this will reduce the refcount on a table version which might not be associated with a lease. Removed the need to use removeLeaseIfExpiring since once a transaction has picked a timestamp and a valid table descriptor for the timestamp the table descriptor need not be reacquired through the API. related to #2948 --- pkg/internal/client/db_test.go | 1 - pkg/internal/client/txn.go | 5 - pkg/sql/backfill.go | 20 ++-- pkg/sql/data_source.go | 4 +- pkg/sql/executor.go | 2 +- pkg/sql/executor_test.go | 2 +- pkg/sql/helpers_test.go | 23 +++-- pkg/sql/internal.go | 8 +- pkg/sql/lease.go | 163 ++++++++++++++++-------------- pkg/sql/lease_internal_test.go | 131 ++++++++++++------------ pkg/sql/lease_test.go | 119 +++++++++++----------- pkg/sql/parallel_stmts_test.go | 2 +- pkg/sql/pgwire/pgwire_test.go | 1 + pkg/sql/planner.go | 6 +- pkg/sql/schema_changer_test.go | 4 +- pkg/sql/session.go | 28 +++--- pkg/sql/show.go | 2 +- pkg/sql/show_fingerprints.go | 2 +- pkg/sql/table.go | 177 +++++++++++++++------------------ pkg/sql/table_test.go | 54 ---------- pkg/sql/truncate.go | 4 +- pkg/sql/update.go | 2 +- 22 files changed, 362 insertions(+), 398 deletions(-) diff --git a/pkg/internal/client/db_test.go b/pkg/internal/client/db_test.go index a8858f2bfd5d..884db6041d1a 100644 --- a/pkg/internal/client/db_test.go +++ b/pkg/internal/client/db_test.go @@ -389,7 +389,6 @@ func TestCommonMethods(t *testing.T) { {txnType, "IsFinalized"}: {}, {txnType, "NewBatch"}: {}, {txnType, "Exec"}: {}, - {txnType, "GetDeadline"}: {}, {txnType, "ResetDeadline"}: {}, {txnType, "Run"}: {}, {txnType, "SetDebugName"}: {}, diff --git a/pkg/internal/client/txn.go b/pkg/internal/client/txn.go index c7ecf7774e47..89325d211ce8 100644 --- a/pkg/internal/client/txn.go +++ b/pkg/internal/client/txn.go @@ -523,11 +523,6 @@ func (txn *Txn) ResetDeadline() { txn.deadline = nil } -// GetDeadline returns the deadline. For testing. -func (txn *Txn) GetDeadline() *hlc.Timestamp { - return txn.deadline -} - // Rollback sends an EndTransactionRequest with Commit=false. // txn is considered finalized and cannot be used to send any more commands. func (txn *Txn) Rollback(ctx context.Context) error { diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 19742af3b777..81846a0304d7 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -232,10 +232,10 @@ func (sc *SchemaChanger) maybeWriteResumeSpan( return nil } -func (sc *SchemaChanger) getTableLease( - ctx context.Context, txn *client.Txn, lc *LeaseCollection, version sqlbase.DescriptorVersion, +func (sc *SchemaChanger) getTableVersion( + ctx context.Context, txn *client.Txn, tc *TableCollection, version sqlbase.DescriptorVersion, ) (*sqlbase.TableDescriptor, error) { - tableDesc, err := lc.getTableLeaseByID(ctx, txn, sc.tableID) + tableDesc, err := tc.getTableVersionByID(ctx, txn, sc.tableID) if err != nil { return nil, err } @@ -285,9 +285,9 @@ func (sc *SchemaChanger) truncateIndexes( return err } - lc := &LeaseCollection{leaseMgr: sc.leaseMgr} - defer lc.releaseLeases(ctx) - tableDesc, err := sc.getTableLease(ctx, txn, lc, version) + tc := &TableCollection{leaseMgr: sc.leaseMgr} + defer tc.releaseTables(ctx) + tableDesc, err := sc.getTableVersion(ctx, txn, tc, version) if err != nil { return err } @@ -402,10 +402,10 @@ func (sc *SchemaChanger) distBackfill( } log.VEventf(ctx, 2, "backfill: process %+v spans", spans) if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - lc := &LeaseCollection{leaseMgr: sc.leaseMgr} + tc := &TableCollection{leaseMgr: sc.leaseMgr} // Use a leased table descriptor for the backfill. - defer lc.releaseLeases(ctx) - tableDesc, err := sc.getTableLease(ctx, txn, lc, version) + defer tc.releaseTables(ctx) + tableDesc, err := sc.getTableVersion(ctx, txn, tc, version) if err != nil { return err } @@ -415,7 +415,7 @@ func (sc *SchemaChanger) distBackfill( if backfillType == columnBackfill { fkTables := sqlbase.TablesNeededForFKs(*tableDesc, sqlbase.CheckUpdates) for k := range fkTables { - table, err := lc.getTableLeaseByID(ctx, txn, k) + table, err := tc.getTableVersionByID(ctx, txn, k) if err != nil { return err } diff --git a/pkg/sql/data_source.go b/pkg/sql/data_source.go index fd0a1b723713..4894c676ce97 100644 --- a/pkg/sql/data_source.go +++ b/pkg/sql/data_source.go @@ -458,7 +458,7 @@ func (p *planner) getTableScanByRef( scanVisibility scanVisibility, ) (planDataSource, error) { tableID := sqlbase.ID(tref.TableID) - descFunc := p.session.leases.getTableLeaseByID + descFunc := p.session.tables.getTableVersionByID if p.avoidCachedDescriptors { descFunc = sqlbase.GetTableDescFromID } @@ -502,7 +502,7 @@ func (p *planner) getTableScanOrViewPlan( desc, err = mustGetTableOrViewDesc( ctx, p.txn, p.getVirtualTabler(), tn, false /*allowAdding*/) } else { - desc, err = p.session.leases.getTableLease(ctx, p.txn, p.getVirtualTabler(), tn) + desc, err = p.session.tables.getTableVersion(ctx, p.txn, p.getVirtualTabler(), tn) } if err != nil { return planDataSource{}, err diff --git a/pkg/sql/executor.go b/pkg/sql/executor.go index d1e5c728b59a..2c8e9d6e14d8 100644 --- a/pkg/sql/executor.go +++ b/pkg/sql/executor.go @@ -894,7 +894,7 @@ func (e *Executor) execParsed( // Exec the schema changers (if the txn rolled back, the schema changers // will short-circuit because the corresponding descriptor mutation is not // found). - session.leases.releaseLeases(session.Ctx()) + session.tables.releaseTables(session.Ctx()) txnState.schemaChangers.execSchemaChanges(session.Ctx(), e, session, res.ResultList) } diff --git a/pkg/sql/executor_test.go b/pkg/sql/executor_test.go index f8d33611cbb8..70481e0dad00 100644 --- a/pkg/sql/executor_test.go +++ b/pkg/sql/executor_test.go @@ -77,7 +77,7 @@ func TestPrepareCanAcquireLeases(t *testing.T) { // Acquire a lease and assert that the store did in fact create a // new lease. - _, err := s.LeaseManager().(*LeaseManager).Acquire( + _, _, err := s.LeaseManager().(*LeaseManager).Acquire( ctx, planner.txn, sqlbase.ID(dummyTableID), 0 /* version */) if err != nil { return nil, err diff --git a/pkg/sql/helpers_test.go b/pkg/sql/helpers_test.go index 599cc87e46b9..229c00471b35 100644 --- a/pkg/sql/helpers_test.go +++ b/pkg/sql/helpers_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -38,7 +39,7 @@ type LeaseRemovalTracker struct { mu syncutil.Mutex // map from a lease whose release we're waiting for to a tracker for that // lease. - tracking map[*LeaseState]RemovalTracker + tracking map[tableVersionID]RemovalTracker } type RemovalTracker struct { @@ -50,21 +51,25 @@ type RemovalTracker struct { // NewLeaseRemovalTracker creates a LeaseRemovalTracker. func NewLeaseRemovalTracker() *LeaseRemovalTracker { return &LeaseRemovalTracker{ - tracking: make(map[*LeaseState]RemovalTracker), + tracking: make(map[tableVersionID]RemovalTracker), } } // TrackRemoval starts monitoring lease removals for a particular lease. // This should be called before triggering the operation that (asynchronously) // removes the lease. -func (w *LeaseRemovalTracker) TrackRemoval(lease *LeaseState) RemovalTracker { +func (w *LeaseRemovalTracker) TrackRemoval(table sqlbase.TableDescriptor) RemovalTracker { + id := tableVersionID{ + id: table.ID, + version: table.Version, + } w.mu.Lock() defer w.mu.Unlock() - if tracker, ok := w.tracking[lease]; ok { + if tracker, ok := w.tracking[id]; ok { return tracker } tracker := RemovalTracker{removed: make(chan struct{}), err: new(error)} - w.tracking[lease] = tracker + w.tracking[id] = tracker return tracker } @@ -80,10 +85,14 @@ func (t RemovalTracker) WaitForRemoval() error { func (w *LeaseRemovalTracker) LeaseRemovedNotification(lease *LeaseState, err error) { w.mu.Lock() defer w.mu.Unlock() - if tracker, ok := w.tracking[lease]; ok { + id := tableVersionID{ + id: lease.ID, + version: lease.Version, + } + if tracker, ok := w.tracking[id]; ok { *tracker.err = err close(tracker.removed) - delete(w.tracking, lease) + delete(w.tracking, id) } } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 3584b3d13aed..a6a8971d16af 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -45,7 +45,7 @@ func (ie InternalExecutor) ExecuteStatementInTransaction( ) (int, error) { p := makeInternalPlanner(opName, txn, security.RootUser, ie.LeaseManager.memMetrics) defer finishInternalPlanner(p) - p.session.leases.leaseMgr = ie.LeaseManager + p.session.tables.leaseMgr = ie.LeaseManager return p.exec(ctx, statement, qargs...) } @@ -57,7 +57,7 @@ func (ie InternalExecutor) QueryRowInTransaction( ) (parser.Datums, error) { p := makeInternalPlanner(opName, txn, security.RootUser, ie.LeaseManager.memMetrics) defer finishInternalPlanner(p) - p.session.leases.leaseMgr = ie.LeaseManager + p.session.tables.leaseMgr = ie.LeaseManager return p.QueryRow(ctx, statement, qargs...) } @@ -68,7 +68,7 @@ func (ie InternalExecutor) GetTableSpan( // Lookup the table ID. p := makeInternalPlanner("get-table-span", txn, user, ie.LeaseManager.memMetrics) defer finishInternalPlanner(p) - p.session.leases.leaseMgr = ie.LeaseManager + p.session.tables.leaseMgr = ie.LeaseManager tn := parser.TableName{DatabaseName: parser.Name(dbName), TableName: parser.Name(tableName)} tableID, err := getTableID(ctx, p, &tn) @@ -97,7 +97,7 @@ func getTableID(ctx context.Context, p *planner, tn *parser.TableName) (sqlbase. return virtual.GetID(), nil } - dbID, err := p.session.leases.databaseCache.getDatabaseID(ctx, p.txn, p.getVirtualTabler(), tn.Database()) + dbID, err := p.session.tables.databaseCache.getDatabaseID(ctx, p.txn, p.getVirtualTabler(), tn.Database()) if err != nil { return 0, err } diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index ab59a2946034..ee92ae12c4d5 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -75,7 +75,7 @@ type LeaseState struct { } func (s *LeaseState) String() string { - return fmt.Sprintf("%d(%q) ver=%d:%d", s.ID, s.Name, s.Version, s.expiration.UnixNano()) + return fmt.Sprintf("%d(%q) ver=%d:%d, refcount=%d", s.ID, s.Name, s.Version, s.expiration.UnixNano(), s.refcount) } // Expiration returns the expiration time of the lease. @@ -93,13 +93,6 @@ func (s *LeaseState) hasSomeLifeLeft(clock *hlc.Clock) bool { return s.expiration.After(minDesiredExpiration) } -// Refcount returns the reference count of the lease. -func (s *LeaseState) Refcount() int { - s.mu.Lock() - defer s.mu.Unlock() - return s.refcount -} - func (s *LeaseState) incRefcount() { s.mu.Lock() s.incRefcountLocked() @@ -110,10 +103,11 @@ func (s *LeaseState) incRefcountLocked() { panic(fmt.Sprintf("trying to incRefcount on released lease: %+v", s)) } s.refcount++ - if log.V(3) { - log.Infof(context.TODO(), "LeaseState.incRef: descID=%d name=%q version=%d refcount=%d", - s.ID, s.Name, s.Version, s.refcount) - } + log.VEventf(context.TODO(), 2, "LeaseState.incRef: %s", s) +} + +func (s *LeaseState) expirationToHLC() hlc.Timestamp { + return hlc.Timestamp{WallTime: s.Expiration().UnixNano()} } // LeaseStore implements the operations for acquiring and releasing leases and @@ -417,9 +411,11 @@ func (s LeaseStore) countLeases( } // leaseSet maintains an ordered set of LeaseState objects. It supports -// addition and removal of elements, finding a specific lease, finding the -// newest lease for a particular version and finding the newest lease for the -// most recent version. +// addition and removal of elements, finding the lease for a particular +// version, and finding the lease for the most recent version. +// TODO(vivek): make this a tableSet. A node only needs to manage a +// single lease on what it thinks is the latest version for a table +// descriptor. type leaseSet struct { // The lease state data is stored in a sorted slice ordered by . Ordering is maintained by insert and remove. @@ -438,7 +434,7 @@ func (l *leaseSet) String() string { } func (l *leaseSet) insert(s *LeaseState) { - i, match := l.findIndex(s.Version, s.expiration) + i, match := l.findIndex(s.Version) if match { panic("unable to insert duplicate lease") } @@ -452,36 +448,28 @@ func (l *leaseSet) insert(s *LeaseState) { } func (l *leaseSet) remove(s *LeaseState) { - i, match := l.findIndex(s.Version, s.expiration) + i, match := l.findIndex(s.Version) if !match { panic(fmt.Sprintf("can't find lease to remove: %s", s)) } l.data = append(l.data[:i], l.data[i+1:]...) } -func (l *leaseSet) find( - version sqlbase.DescriptorVersion, expiration parser.DTimestamp, -) *LeaseState { - if i, match := l.findIndex(version, expiration); match { +func (l *leaseSet) find(version sqlbase.DescriptorVersion) *LeaseState { + if i, match := l.findIndex(version); match { return l.data[i] } return nil } -func (l *leaseSet) findIndex( - version sqlbase.DescriptorVersion, expiration parser.DTimestamp, -) (int, bool) { +func (l *leaseSet) findIndex(version sqlbase.DescriptorVersion) (int, bool) { i := sort.Search(len(l.data), func(i int) bool { s := l.data[i] - if s.Version == version { - // a >= b -> !a.Before(b) - return !s.expiration.Before(expiration.Time) - } - return s.Version > version + return s.Version >= version }) if i < len(l.data) { s := l.data[i] - if s.Version == version && s.expiration.Equal(expiration.Time) { + if s.Version == version { return i, true } } @@ -607,7 +595,7 @@ func (t *tableState) acquireFromStoreLocked( if err != nil { return err } - t.active.insert(s) + t.upsertLocked(ctx, s, m) return nil } @@ -643,10 +631,33 @@ func (t *tableState) acquireFreshestFromStoreLocked( if err != nil { return err } - t.active.insert(s) + t.upsertLocked(ctx, s, m) return nil } +// upsertLocked inserts a lease for a particular table version. +// If an existing lease exists for the table version, it releases +// the older lease and replaces it. +func (t *tableState) upsertLocked(ctx context.Context, lease *LeaseState, m *LeaseManager) { + s := t.active.find(lease.Version) + if s == nil { + t.active.insert(lease) + return + } + + s.mu.Lock() + lease.mu.Lock() + // subsume the refcount of the older leasse. + lease.refcount += s.refcount + s.refcount = 0 + s.released = true + lease.mu.Unlock() + s.mu.Unlock() + log.VEventf(ctx, 2, "replaced lease: %s with %s", s, lease) + t.removeLease(s, m) + t.active.insert(lease) +} + // releaseInactiveLeases releases the leases in t.active.data with refcount 0. // t.mu must be locked. func (t *tableState) releaseInactiveLeases(m *LeaseManager) { @@ -717,13 +728,13 @@ func (t *tableState) acquireNodeLease( return lease, nil } -func (t *tableState) release(lease *LeaseState, m *LeaseManager) error { +func (t *tableState) release(table sqlbase.TableDescriptor, m *LeaseManager) error { t.mu.Lock() defer t.mu.Unlock() - s := t.active.find(lease.Version, lease.expiration) + s := t.active.find(table.Version) if s == nil { - return errors.Errorf("table %d version %d not found", lease.ID, lease.Version) + return errors.Errorf("table %d version %d not found", table.ID, table.Version) } // Decrements the refcount and returns true if the lease has to be removed // from the store. @@ -744,11 +755,9 @@ func (t *tableState) release(lease *LeaseState, m *LeaseManager) error { s.mu.Lock() defer s.mu.Unlock() s.refcount-- - if log.V(3) { - log.Infof(context.TODO(), "release: descID=%d name:%q version=%d refcount=%d", s.ID, s.Name, s.Version, s.refcount) - } + log.VEventf(context.TODO(), 2, "release: %s", s) if s.refcount < 0 { - panic(fmt.Sprintf("negative ref count: descID=%d(%q) version=%d refcount=%d", s.ID, s.Name, s.Version, s.refcount)) + panic(fmt.Sprintf("negative ref count: %s", s)) } if s.refcount == 0 && removeOnceDereferenced { s.released = true @@ -818,8 +827,8 @@ func (t *tableState) purgeOldLeases( return nil } - // Acquire a lease on the table at a version >= minVersion so that - // we maintain an active lease on the latest version, so that it + // Acquire a lease on the table at a version >= minVersion + // to maintain an active lease on the latest version, so that it // doesn't get released when releaseInactive() is called below. var lease *LeaseState err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { @@ -830,7 +839,7 @@ func (t *tableState) purgeOldLeases( if dropped := err == errTableDropped; dropped || err == nil { releaseInactives(dropped) if lease != nil { - return t.release(lease, m) + return t.release(lease.TableDescriptor, m) } return nil } @@ -906,7 +915,7 @@ func (c *tableNameCache) get(dbID sqlbase.ID, tableName string, clock *hlc.Clock } lease.mu.Lock() defer lease.mu.Unlock() - if !nameMatchesLease(lease, dbID, tableName) { + if !nameMatchesTable(lease.TableDescriptor, dbID, tableName) { panic(fmt.Sprintf("Out of sync entry in the name cache. "+ "Cache entry: %d.%q -> %d. Lease: %d.%q.", dbID, tableName, lease.ID, lease.ParentID, lease.Name)) @@ -1028,9 +1037,9 @@ func NewLeaseManager( return lm } -func nameMatchesLease(lease *LeaseState, dbID sqlbase.ID, tableName string) bool { - return lease.ParentID == dbID && - parser.ReNormalizeName(lease.Name) == parser.ReNormalizeName(tableName) +func nameMatchesTable(table sqlbase.TableDescriptor, dbID sqlbase.ID, tableName string) bool { + return table.ParentID == dbID && + parser.ReNormalizeName(table.Name) == parser.ReNormalizeName(tableName) } // AcquireByName acquires a read lease for the specified table. @@ -1038,11 +1047,11 @@ func nameMatchesLease(lease *LeaseState, dbID sqlbase.ID, tableName string) bool // lease manager knows about. func (m *LeaseManager) AcquireByName( ctx context.Context, txn *client.Txn, dbID sqlbase.ID, tableName string, -) (*LeaseState, error) { +) (sqlbase.TableDescriptor, hlc.Timestamp, error) { // Check if we have cached an ID for this name. lease := m.tableNames.get(dbID, tableName, m.clock) if lease != nil { - return lease, nil + return lease.TableDescriptor, lease.expirationToHLC(), nil } // We failed to find something in the cache, or what we found is not @@ -1052,13 +1061,13 @@ func (m *LeaseManager) AcquireByName( var err error tableID, err := m.resolveName(ctx, txn, dbID, tableName) if err != nil { - return nil, err + return sqlbase.TableDescriptor{}, hlc.Timestamp{}, err } - lease, err = m.Acquire(ctx, txn, tableID, 0) + table, expiration, err := m.Acquire(ctx, txn, tableID, 0) if err != nil { - return nil, err + return sqlbase.TableDescriptor{}, hlc.Timestamp{}, err } - if !nameMatchesLease(lease, dbID, tableName) { + if !nameMatchesTable(table, dbID, tableName) { // We resolved name `tableName`, but the lease has a different name in it. // That can mean two things. Assume the table is being renamed from A to B. // a) `tableName` is A. The transaction doing the RENAME committed (so the @@ -1089,24 +1098,25 @@ func (m *LeaseManager) AcquireByName( // How do we disambiguate between the a) and b)? We get a fresh lease on // the descriptor, as required by b), and then we'll know if we're trying to // resolve the current or the old name. - - if err := m.Release(lease); err != nil { + // + // TODO(vivek): check if the entire above comment is indeed true. + if err := m.Release(table); err != nil { log.Warningf(ctx, "error releasing lease: %s", err) } - lease, err = m.acquireFreshestFromStore(ctx, txn, tableID) + table, expiration, err = m.acquireFreshestFromStore(ctx, txn, tableID) if err != nil { - return nil, err + return sqlbase.TableDescriptor{}, hlc.Timestamp{}, err } - if lease == nil || !nameMatchesLease(lease, dbID, tableName) { + if !nameMatchesTable(table, dbID, tableName) { // If the name we had doesn't match the newest descriptor in the DB, then // we're trying to use an old name. - if err := m.Release(lease); err != nil { + if err := m.Release(table); err != nil { log.Warningf(ctx, "error releasing lease: %s", err) } - return nil, sqlbase.ErrDescriptorNotFound + return sqlbase.TableDescriptor{}, hlc.Timestamp{}, sqlbase.ErrDescriptorNotFound } } - return lease, nil + return table, expiration, nil } // resolveName resolves a table name to a descriptor ID by looking in the @@ -1134,13 +1144,16 @@ func (m *LeaseManager) resolveName( // it. func (m *LeaseManager) Acquire( ctx context.Context, txn *client.Txn, tableID sqlbase.ID, version sqlbase.DescriptorVersion, -) (*LeaseState, error) { +) (sqlbase.TableDescriptor, hlc.Timestamp, error) { t := m.findTableState(tableID, true) lease, err := t.acquire(ctx, txn, version, m) if m.LeaseStore.testingKnobs.LeaseAcquiredEvent != nil { m.LeaseStore.testingKnobs.LeaseAcquiredEvent(lease, err) } - return lease, err + if err != nil { + return sqlbase.TableDescriptor{}, hlc.Timestamp{}, err + } + return lease.TableDescriptor, lease.expirationToHLC(), nil } // acquireFreshestFromStore acquires a new lease from the store. The returned @@ -1151,28 +1164,28 @@ func (m *LeaseManager) Acquire( // for release()ing it. func (m *LeaseManager) acquireFreshestFromStore( ctx context.Context, txn *client.Txn, tableID sqlbase.ID, -) (*LeaseState, error) { +) (sqlbase.TableDescriptor, hlc.Timestamp, error) { t := m.findTableState(tableID, true) t.mu.Lock() defer t.mu.Unlock() if err := t.acquireFreshestFromStoreLocked( ctx, txn, 0 /* version */, m, ); err != nil { - return nil, err + return sqlbase.TableDescriptor{}, hlc.Timestamp{}, err } lease := t.active.findNewest(0) if lease == nil { panic("no lease in active set after having just acquired one") } lease.incRefcount() - return lease, nil + return lease.TableDescriptor, lease.expirationToHLC(), nil } // Release releases a previously acquired read lease. -func (m *LeaseManager) Release(lease *LeaseState) error { - t := m.findTableState(lease.ID, false /* create */) +func (m *LeaseManager) Release(desc sqlbase.TableDescriptor) error { + t := m.findTableState(desc.ID, false /* create */) if t == nil { - return errors.Errorf("table %d not found", lease.ID) + return errors.Errorf("table %d not found", desc.ID) } // TODO(pmattis): Can/should we delete from LeaseManager.tables if the // tableState becomes empty? @@ -1180,7 +1193,7 @@ func (m *LeaseManager) Release(lease *LeaseState) error { // could be bad if a lot of tables keep being created. I looked into cleaning // up a bit, but it seems tricky to do with the current locking which is split // between LeaseManager and tableState. - return t.release(lease, m) + return t.release(desc, m) } func (m *LeaseManager) isDraining() bool { @@ -1280,11 +1293,15 @@ func (m *LeaseManager) RefreshLeases(s *stop.Stopper, db *client.DB, gossip *gos }) } -// LeaseCollection is a collection of leases held by a single session that +// TableCollection is a collection of tables held by a single session that // serves SQL requests, or a background job using a table descriptor. -type LeaseCollection struct { - // leases holds the state of per-table leases acquired by the leaseMgr. - leases []*LeaseState +type TableCollection struct { + timestamp hlc.Timestamp + // A collection of table descriptor valid for the timestamp. + // They are released once the transaction using them is complete. + // If the transaction gets pushed and the timestamp changes, + // the tables are released. + tables []sqlbase.TableDescriptor // leaseMgr manages acquiring and releasing per-table leases. leaseMgr *LeaseManager // databaseCache is used as a cache for database names. diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index e6bee2a7806d..2760951e6533 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) @@ -57,33 +58,26 @@ func TestLeaseSet(t *testing.T) { {newest{0}, "2:3"}, {newest{2}, "2:3"}, {newest{3}, ""}, - {insert{2, 1}, "2:1 2:3"}, - {newest{0}, "2:3"}, - {newest{2}, "2:3"}, - {newest{3}, ""}, - {insert{2, 4}, "2:1 2:3 2:4"}, + {remove{2, 3}, ""}, + {insert{2, 4}, "2:4"}, {newest{0}, "2:4"}, {newest{2}, "2:4"}, {newest{3}, ""}, - {insert{2, 2}, "2:1 2:2 2:3 2:4"}, - {insert{3, 1}, "2:1 2:2 2:3 2:4 3:1"}, + {insert{3, 1}, "2:4 3:1"}, {newest{0}, "3:1"}, {newest{1}, ""}, {newest{2}, "2:4"}, {newest{3}, "3:1"}, {newest{4}, ""}, - {insert{1, 1}, "1:1 2:1 2:2 2:3 2:4 3:1"}, + {insert{1, 1}, "1:1 2:4 3:1"}, {newest{0}, "3:1"}, {newest{1}, "1:1"}, {newest{2}, "2:4"}, {newest{3}, "3:1"}, {newest{4}, ""}, - {remove{2, 4}, "1:1 2:1 2:2 2:3 3:1"}, - {remove{3, 1}, "1:1 2:1 2:2 2:3"}, - {remove{1, 1}, "2:1 2:2 2:3"}, - {remove{2, 2}, "2:1 2:3"}, - {remove{2, 3}, "2:1"}, - {remove{2, 1}, ""}, + {remove{3, 1}, "1:1 2:4"}, + {remove{1, 1}, "2:4"}, + {remove{2, 4}, ""}, } set := &leaseSet{} @@ -156,30 +150,46 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") - var leases []*LeaseState - err := kvDB.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error { - for i := 0; i < 3; i++ { - lease, err := leaseManager.acquireFreshestFromStore(ctx, txn, tableDesc.ID) - if err != nil { - t.Fatal(err) - } - leases = append(leases, lease) - if err := leaseManager.Release(lease); err != nil { - t.Fatal(err) + var tables []sqlbase.TableDescriptor + var expiration hlc.Timestamp + getLeases := func() { + err := kvDB.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error { + for i := 0; i < 3; i++ { + table, exp, err := leaseManager.acquireFreshestFromStore(ctx, txn, tableDesc.ID) + if err != nil { + t.Fatal(err) + } + tables = append(tables, table) + expiration = exp + if err := leaseManager.Release(table); err != nil { + t.Fatal(err) + } } + return nil + }) + if err != nil { + t.Fatal(err) } - return nil - }) - if err != nil { - t.Fatal(err) } + getLeases() ts := leaseManager.findTableState(tableDesc.ID, false) - if numLeases := getNumLeases(ts); numLeases != 3 { - t.Fatalf("found %d leases instead of 3", numLeases) + if numLeases := getNumLeases(ts); numLeases != 1 { + t.Fatalf("found %d leases instead of 1", numLeases) + } + // Publish a new version for the table + if _, err := leaseManager.Publish(context.TODO(), tableDesc.ID, func(*sqlbase.TableDescriptor) error { + return nil + }, nil); err != nil { + t.Fatal(err) } + getLeases() + ts = leaseManager.findTableState(tableDesc.ID, false) + if numLeases := getNumLeases(ts); numLeases != 2 { + t.Fatalf("found %d leases instead of 1", numLeases) + } if err := ts.purgeOldLeases( - context.TODO(), kvDB, false, 1 /* minVersion */, leaseManager); err != nil { + context.TODO(), kvDB, false, 2 /* minVersion */, leaseManager); err != nil { t.Fatal(err) } @@ -187,11 +197,16 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); t.Fatalf("found %d leases instead of 1", numLeases) } ts.mu.Lock() - correctLease := ts.active.data[0] == leases[2] + correctLease := ts.active.data[0].TableDescriptor.ID == tables[5].ID && + ts.active.data[0].TableDescriptor.Version == tables[5].Version + correctExpiration := ts.active.data[0].expirationToHLC() == expiration ts.mu.Unlock() if !correctLease { t.Fatalf("wrong lease survived purge") } + if !correctExpiration { + t.Fatalf("wrong lease expiration survived purge") + } } // Test that changing a descriptor's name updates the name cache. @@ -233,7 +248,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if lease.ID != tableDesc.ID { t.Fatalf("new name has wrong ID: %d (expected: %d)", lease.ID, tableDesc.ID) } - if err := leaseManager.Release(lease); err != nil { + if err := leaseManager.Release(lease.TableDescriptor); err != nil { t.Fatal(err) } @@ -260,7 +275,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if lease.ID != tableDesc.ID { t.Fatalf("new name has wrong ID: %d (expected: %d)", lease.ID, tableDesc.ID) } - if err := leaseManager.Release(lease); err != nil { + if err := leaseManager.Release(lease.TableDescriptor); err != nil { t.Fatal(err) } } @@ -293,7 +308,7 @@ CREATE TABLE t.%s (k CHAR PRIMARY KEY, v CHAR); if lease := leaseManager.tableNames.get(tableDesc.ParentID, tableName, s.Clock()); lease == nil { t.Fatalf("name cache has no unexpired entry for (%d, %s)", tableDesc.ParentID, tableName) } else { - if err := leaseManager.Release(lease); err != nil { + if err := leaseManager.Release(lease.TableDescriptor); err != nil { t.Fatal(err) } } @@ -332,7 +347,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); if lease == nil { t.Fatalf("no name cache entry") } - if err := leaseManager.Release(lease); err != nil { + if err := leaseManager.Release(lease.TableDescriptor); err != nil { t.Fatal(err) } } @@ -367,15 +382,15 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") // Populate the name cache. - var lease *LeaseState + var table sqlbase.TableDescriptor if err := kvDB.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error { var err error - lease, err = leaseManager.AcquireByName(ctx, txn, tableDesc.ParentID, "test") + table, _, err = leaseManager.AcquireByName(ctx, txn, tableDesc.ParentID, "test") return err }); err != nil { t.Fatal(err) } - if err := leaseManager.Release(lease); err != nil { + if err := leaseManager.Release(table); err != nil { t.Fatal(err) } @@ -386,22 +401,22 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Try to trigger the race repeatedly: race an AcquireByName against a // Release. - // leaseChan acts as a barrier, synchornizing the two routines at every + // tableChan acts as a barrier, synchornizing the two routines at every // iteration. - leaseChan := make(chan *LeaseState) + tableChan := make(chan sqlbase.TableDescriptor) errChan := make(chan error) go func() { - for lease := range leaseChan { + for table := range tableChan { // Move errors to the main goroutine. - errChan <- leaseManager.Release(lease) + errChan <- leaseManager.Release(table) } }() for i := 0; i < 50; i++ { - var leaseByName *LeaseState + var tableByName sqlbase.TableDescriptor if err := kvDB.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error { var err error - lease, err := leaseManager.AcquireByName(ctx, txn, tableDesc.ParentID, "test") + table, _, err := leaseManager.AcquireByName(ctx, txn, tableDesc.ParentID, "test") if err != nil { t.Fatal(err) } @@ -412,15 +427,15 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // removed from the store until they expire, and the jitter is small // compared to their lifetime, but it is a problem in this test because // we churn through leases quickly. - tracker := removalTracker.TrackRemoval(lease) + tracker := removalTracker.TrackRemoval(table) // Start the race: signal the other guy to release, and we do another // acquire at the same time. - leaseChan <- lease - leaseByName, err = leaseManager.AcquireByName(ctx, txn, tableDesc.ParentID, "test") + tableChan <- table + tableByName, _, err = leaseManager.AcquireByName(ctx, txn, tableDesc.ParentID, "test") if err != nil { t.Fatal(err) } - tracker2 := removalTracker.TrackRemoval(leaseByName) + tracker2 := removalTracker.TrackRemoval(tableByName) // See if there was an error releasing lease. err = <-errChan if err != nil { @@ -430,21 +445,15 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Depending on how the race went, there are two cases - either the // AcquireByName ran first, and got the same lease as we already had, // or the Release ran first and so we got a new lease. - if leaseByName == lease { - if lease.Refcount() != 1 { - t.Fatalf("expected refcount 1, got %d", lease.Refcount()) - } - if err := leaseManager.Release(lease); err != nil { + if tableByName.ID == table.ID { + if err := leaseManager.Release(table); err != nil { t.Fatal(err) } if err := tracker.WaitForRemoval(); err != nil { t.Fatal(err) } } else { - if lease.Refcount() != 0 { - t.Fatalf("expected refcount 0, got %d", lease.Refcount()) - } - if err := leaseManager.Release(leaseByName); err != nil { + if err := leaseManager.Release(tableByName); err != nil { t.Fatal(err) } if err := tracker2.WaitForRemoval(); err != nil { @@ -456,7 +465,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); t.Fatal(err) } } - close(leaseChan) + close(tableChan) } // TestAcquireFreshestFromStoreRaces runs @@ -483,11 +492,11 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); go func() { defer wg.Done() err := kvDB.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error { - lease, err := leaseManager.acquireFreshestFromStore(ctx, txn, tableDesc.ID) + table, _, err := leaseManager.acquireFreshestFromStore(ctx, txn, tableDesc.ID) if err != nil { return err } - if err := leaseManager.Release(lease); err != nil { + if err := leaseManager.Release(table); err != nil { return err } return nil diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 4153ae020fd3..5aeac9ad31d7 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -109,28 +110,29 @@ func (t *leaseTest) expectLeases(descID sqlbase.ID, expected string) { func (t *leaseTest) acquire( nodeID uint32, descID sqlbase.ID, version sqlbase.DescriptorVersion, -) (*sql.LeaseState, error) { - var lease *sql.LeaseState +) (sqlbase.TableDescriptor, hlc.Timestamp, error) { + var table sqlbase.TableDescriptor + var expiration hlc.Timestamp err := t.kvDB.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error { var err error - lease, err = t.node(nodeID).Acquire(ctx, txn, descID, version) + table, expiration, err = t.node(nodeID).Acquire(ctx, txn, descID, version) return err }) - return lease, err + return table, expiration, err } func (t *leaseTest) mustAcquire( nodeID uint32, descID sqlbase.ID, version sqlbase.DescriptorVersion, -) *sql.LeaseState { - lease, err := t.acquire(nodeID, descID, version) +) (sqlbase.TableDescriptor, hlc.Timestamp) { + table, expiration, err := t.acquire(nodeID, descID, version) if err != nil { t.Fatal(err) } - return lease + return table, expiration } -func (t *leaseTest) release(nodeID uint32, lease *sql.LeaseState) error { - return t.node(nodeID).Release(lease) +func (t *leaseTest) release(nodeID uint32, table sqlbase.TableDescriptor) error { + return t.node(nodeID).Release(table) } // If leaseRemovalTracker is not nil, it will be used to block until the lease is @@ -138,13 +140,13 @@ func (t *leaseTest) release(nodeID uint32, lease *sql.LeaseState) error { // store (i.e. it's not expired and it's not for an old descriptor version), // this shouldn't be set. func (t *leaseTest) mustRelease( - nodeID uint32, lease *sql.LeaseState, leaseRemovalTracker *sql.LeaseRemovalTracker, + nodeID uint32, table sqlbase.TableDescriptor, leaseRemovalTracker *sql.LeaseRemovalTracker, ) { var tracker sql.RemovalTracker if leaseRemovalTracker != nil { - tracker = leaseRemovalTracker.TrackRemoval(lease) + tracker = leaseRemovalTracker.TrackRemoval(table) } - if err := t.release(nodeID, lease); err != nil { + if err := t.release(nodeID, table); err != nil { t.Fatal(err) } if leaseRemovalTracker != nil { @@ -203,11 +205,11 @@ func TestLeaseManager(testingT *testing.T) { // We can't acquire a lease on a non-existent table. expected := "descriptor not found" - if _, err := t.acquire(1, 10000, 0); !testutils.IsError(err, expected) { + if _, _, err := t.acquire(1, 10000, 0); !testutils.IsError(err, expected) { t.Fatalf("expected %s, but found %v", expected, err) } - l1 := t.mustAcquire(1, descID, 0) + l1, _ := t.mustAcquire(1, descID, 0) t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error. if err := t.release(2, l1); err == nil { @@ -219,14 +221,14 @@ func TestLeaseManager(testingT *testing.T) { // It is an error to acquire a lease for a specific version that doesn't // exist yet. expected = "version 2 of table .* does not exist" - if _, err := t.acquire(1, descID, 2); !testutils.IsError(err, expected) { + if _, _, err := t.acquire(1, descID, 2); !testutils.IsError(err, expected) { t.Fatalf("expected %s, but found %v", expected, err) } // Publish a new version and explicitly acquire it. - l2 := t.mustAcquire(1, descID, 0) + l2, _ := t.mustAcquire(1, descID, 0) t.mustPublish(ctx, 1, descID) - l3 := t.mustAcquire(1, descID, 2) + l3, _ := t.mustAcquire(1, descID, 2) t.expectLeases(descID, "/1/1 /2/1") // When the last local reference on the new version is released we don't @@ -236,7 +238,7 @@ func TestLeaseManager(testingT *testing.T) { // We can still acquire a local reference on the old version since it hasn't // expired. - l4 := t.mustAcquire(1, descID, 1) + l4, _ := t.mustAcquire(1, descID, 1) t.mustRelease(1, l4, nil) t.expectLeases(descID, "/1/1 /2/1") @@ -248,13 +250,13 @@ func TestLeaseManager(testingT *testing.T) { // It is an error to acquire a lease for an old version once a new version // exists and there are no local references for the old version. expected = `table \d+ unable to acquire lease on old version: 1 < 2` - if _, err := t.acquire(1, descID, 1); !testutils.IsError(err, expected) { + if _, _, err := t.acquire(1, descID, 1); !testutils.IsError(err, expected) { t.Fatalf("expected %s, but found %v", expected, err) } // Acquire 2 node leases on version 2. - l5 := t.mustAcquire(1, descID, 2) - l6 := t.mustAcquire(2, descID, 2) + l5, _ := t.mustAcquire(1, descID, 2) + l6, _ := t.mustAcquire(2, descID, 2) // Publish version 3. This will succeed immediately. t.mustPublish(ctx, 3, descID) @@ -268,8 +270,8 @@ func TestLeaseManager(testingT *testing.T) { }() // Force both nodes ahead to version 3. - l7 := t.mustAcquire(1, descID, 3) - l8 := t.mustAcquire(2, descID, 3) + l7, _ := t.mustAcquire(1, descID, 3) + l8, _ := t.mustAcquire(2, descID, 3) t.expectLeases(descID, "/2/1 /2/2 /3/1 /3/2") t.mustRelease(1, l5, removalTracker) @@ -279,7 +281,7 @@ func TestLeaseManager(testingT *testing.T) { // Wait for version 4 to be published. wg.Wait() - l9 := t.mustAcquire(1, descID, 4) + l9, _ := t.mustAcquire(1, descID, 4) t.mustRelease(1, l7, removalTracker) t.mustRelease(2, l8, nil) t.expectLeases(descID, "/3/2 /4/1") @@ -303,16 +305,14 @@ func TestLeaseManagerReacquire(testingT *testing.T) { const descID = keys.LeaseTableID - // Acquire 2 leases from the same node. They should point to the same lease - // structure. - l1 := t.mustAcquire(1, descID, 0) - l2 := t.mustAcquire(1, descID, 0) - if l1 != l2 { - t.Fatalf("expected same lease, but found %p != %p", l1, l2) - } - if l1.Refcount() != 2 { - t.Fatalf("expected refcount of 2, but found %d", l1.Refcount()) + // Acquire 2 leases from the same node. They should return the same + // table and expiration. + l1, e1 := t.mustAcquire(1, descID, 0) + l2, e2 := t.mustAcquire(1, descID, 0) + if l1.ID != l2.ID || e1 != e2 { + t.Fatalf("expected same lease, but found %v != %v", l1, l2) } + t.expectLeases(descID, "/1/1") // Set the minimum lease duration such that the next lease acquisition will @@ -321,25 +321,29 @@ func TestLeaseManagerReacquire(testingT *testing.T) { defer func() { sql.LeaseDuration, sql.MinLeaseDuration = savedLeaseDuration, savedMinLeaseDuration }() - sql.MinLeaseDuration = l1.Expiration().Sub(timeutil.Now()) + + sql.MinLeaseDuration = time.Unix(0, e1.WallTime).Sub(timeutil.Now()) sql.LeaseDuration = 2 * sql.MinLeaseDuration // Another lease acquisition from the same node will result in a new lease. - l3 := t.mustAcquire(1, descID, 0) - if l1 == l3 { - t.Fatalf("expected different leases, but found %p", l1) - } - if l3.Refcount() != 1 { - t.Fatalf("expected refcount of 1, but found %d", l3.Refcount()) + rt := removalTracker.TrackRemoval(l1) + l3, e3 := t.mustAcquire(1, descID, 0) + if l1.ID == l3.ID && e3.WallTime == e1.WallTime { + t.Fatalf("expected different leases, but found %v", l1) } - if l3.Expiration().Before(l1.Expiration()) { + if e3.WallTime < e1.WallTime { t.Fatalf("expected new lease expiration (%s) to be after old lease expiration (%s)", - l3.Expiration(), l1.Expiration()) + e3, e1) } - t.expectLeases(descID, "/1/1 /1/1") + // In acquiring the new lease the older lease is released. + if err := rt.WaitForRemoval(); err != nil { + t.Fatal(err) + } + // Only one actual lease. + t.expectLeases(descID, "/1/1") t.mustRelease(1, l1, nil) - t.mustRelease(1, l2, removalTracker) + t.mustRelease(1, l2, nil) t.mustRelease(1, l3, nil) } @@ -445,8 +449,8 @@ func TestLeaseManagerDrain(testingT *testing.T) { const descID = keys.LeaseTableID { - l1 := t.mustAcquire(1, descID, 0) - l2 := t.mustAcquire(2, descID, 0) + l1, _ := t.mustAcquire(1, descID, 0) + l2, _ := t.mustAcquire(2, descID, 0) t.mustRelease(1, l1, nil) t.expectLeases(descID, "/1/1 /1/2") @@ -458,7 +462,7 @@ func TestLeaseManagerDrain(testingT *testing.T) { t.nodes[2].SetDraining(true) // Leases cannot be acquired when in draining mode. - if _, err := t.acquire(1, descID, 0); !testutils.IsError(err, "cannot acquire lease when draining") { + if _, _, err := t.acquire(1, descID, 0); !testutils.IsError(err, "cannot acquire lease when draining") { t.Fatalf("unexpected error: %v", err) } @@ -479,7 +483,7 @@ func TestLeaseManagerDrain(testingT *testing.T) { // Check that leases with a refcount of 0 are correctly kept in the // store once the drain mode has been exited. t.nodes[1].SetDraining(false) - l1 := t.mustAcquire(1, descID, 0) + l1, _ := t.mustAcquire(1, descID, 0) t.mustRelease(1, l1, nil) t.expectLeases(descID, "/1/1") } @@ -534,7 +538,7 @@ CREATE TABLE test.t(a INT PRIMARY KEY); tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "test", "t") // try to acquire at a bogus version to make sure we don't get back a lease we // already had. - _, err = t.acquire(1, tableDesc.ID, tableDesc.Version+1) + _, _, err = t.acquire(1, tableDesc.ID, tableDesc.Version+1) if !testutils.IsError(err, "table is being dropped") { t.Fatalf("got a different error than expected: %v", err) } @@ -556,14 +560,15 @@ func isDeleted(tableID sqlbase.ID, cfg config.SystemConfig) bool { func acquire( ctx context.Context, s *server.TestServer, descID sqlbase.ID, version sqlbase.DescriptorVersion, -) (*sql.LeaseState, error) { - var lease *sql.LeaseState +) (sqlbase.TableDescriptor, hlc.Timestamp, error) { + var table sqlbase.TableDescriptor + var expiration hlc.Timestamp err := s.DB().Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error { var err error - lease, err = s.LeaseManager().(*sql.LeaseManager).Acquire(ctx, txn, descID, version) + table, expiration, err = s.LeaseManager().(*sql.LeaseManager).Acquire(ctx, txn, descID, version) return err }) - return lease, err + return table, expiration, err } // Test that once a table is marked as deleted, a lease's refcount dropping to 0 @@ -618,11 +623,11 @@ CREATE TABLE test.t(a INT PRIMARY KEY); tableDesc := sqlbase.GetTableDescriptor(kvDB, "test", "t") ctx := context.TODO() - lease1, err := acquire(ctx, s.(*server.TestServer), tableDesc.ID, 0) + lease1, _, err := acquire(ctx, s.(*server.TestServer), tableDesc.ID, 0) if err != nil { t.Fatal(err) } - lease2, err := acquire(ctx, s.(*server.TestServer), tableDesc.ID, 0) + lease2, _, err := acquire(ctx, s.(*server.TestServer), tableDesc.ID, 0) if err != nil { t.Fatal(err) } @@ -645,7 +650,7 @@ CREATE TABLE test.t(a INT PRIMARY KEY); <-deleted // We should still be able to acquire, because we have an active lease. - lease3, err := acquire(ctx, s.(*server.TestServer), tableDesc.ID, 0) + lease3, _, err := acquire(ctx, s.(*server.TestServer), tableDesc.ID, 0) if err != nil { t.Fatal(err) } @@ -661,7 +666,7 @@ CREATE TABLE test.t(a INT PRIMARY KEY); t.Fatal(err) } // Now we shouldn't be able to acquire any more. - _, err = acquire(ctx, s.(*server.TestServer), tableDesc.ID, 0) + _, _, err = acquire(ctx, s.(*server.TestServer), tableDesc.ID, 0) if !testutils.IsError(err, "table is being dropped") { t.Fatalf("got a different error than expected: %v", err) } diff --git a/pkg/sql/parallel_stmts_test.go b/pkg/sql/parallel_stmts_test.go index 75a13d48902e..ca854141f138 100644 --- a/pkg/sql/parallel_stmts_test.go +++ b/pkg/sql/parallel_stmts_test.go @@ -313,7 +313,7 @@ func planNodeForQuery( txn := client.NewTxn(kvDB) txn.Proto().OrigTimestamp = s.Clock().Now() p := makeInternalPlanner("plan", txn, security.RootUser, &MemoryMetrics{}) - p.session.leases.leaseMgr = s.LeaseManager().(*LeaseManager) + p.session.tables.leaseMgr = s.LeaseManager().(*LeaseManager) p.session.Database = "test" stmts, err := p.parser.Parse(sql) diff --git a/pkg/sql/pgwire/pgwire_test.go b/pkg/sql/pgwire/pgwire_test.go index 00af472d52aa..dbb3005a272a 100644 --- a/pkg/sql/pgwire/pgwire_test.go +++ b/pkg/sql/pgwire/pgwire_test.go @@ -437,6 +437,7 @@ func TestPGPrepareFail(t *testing.T) { // transaction. func TestPGPrepareWithCreateDropInTxn(t *testing.T) { defer leaktest.AfterTest(t)() + s, _, _ := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(context.TODO()) diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 94df9ee1307a..49749e224ec2 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -98,7 +98,7 @@ func makeInternalPlanner( User: user, TxnState: txnState{Ctx: ctx}, context: ctx, - leases: LeaseCollection{databaseCache: newDatabaseCache(config.SystemConfig{})}, + tables: TableCollection{databaseCache: newDatabaseCache(config.SystemConfig{})}, } s.mon = mon.MakeUnlimitedMonitor(ctx, @@ -144,7 +144,7 @@ func (p *planner) ExecCfg() *ExecutorConfig { } func (p *planner) LeaseMgr() *LeaseManager { - return p.session.leases.leaseMgr + return p.session.tables.leaseMgr } func (p *planner) User() string { @@ -261,7 +261,7 @@ func (p *planner) exec(ctx context.Context, sql string, args ...interface{}) (in func (p *planner) fillFKTableMap(ctx context.Context, m sqlbase.TableLookupsByID) error { for tableID := range m { - table, err := p.session.leases.getTableLeaseByID(ctx, p.txn, tableID) + table, err := p.session.tables.getTableVersionByID(ctx, p.txn, tableID) if err == errTableAdding { m[tableID] = sqlbase.TableLookup{IsAdding: true} continue diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 554c4d08c0e5..63b805d8dad0 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1272,11 +1272,11 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); // Grab a lease at the latest version so that we are confident // that all future leases will be taken at the latest version. if err := kvDB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { - lease, err := leaseMgr.Acquire(ctx, txn, id, version+1) + table, _, err := leaseMgr.Acquire(ctx, txn, id, version+1) if err != nil { return err } - return leaseMgr.Release(lease) + return leaseMgr.Release(table) }); err != nil { t.Error(err) } diff --git a/pkg/sql/session.go b/pkg/sql/session.go index 1972e5b6ef8f..b2ff42e026fa 100644 --- a/pkg/sql/session.go +++ b/pkg/sql/session.go @@ -285,7 +285,7 @@ type Session struct { Tracing SessionTracing - leases LeaseCollection + tables TableCollection // If set, contains the in progress COPY FROM columns. copyFrom *copyNode @@ -422,7 +422,7 @@ func NewSession( applicationName: args.ApplicationName, database: args.Database, }, - leases: LeaseCollection{ + tables: TableCollection{ leaseMgr: e.cfg.LeaseManager, databaseCache: e.getDatabaseCache(), }, @@ -497,7 +497,7 @@ func (s *Session) Finish(e *Executor) { // Cleanup leases. We might have unreleased leases if we're finishing the // session abruptly in the middle of a transaction, or, until #7648 is // addressed, there might be leases accumulated by preparing statements. - s.leases.releaseLeases(s.context) + s.tables.releaseTables(s.context) s.ClearStatementsAndPortals(s.context) s.sessionMon.Stop(s.context) @@ -535,7 +535,7 @@ func (s *Session) EmergencyClose() { _ = s.parallelizeQueue.Wait() // Release the leases - to ensure other sessions don't get stuck. - s.leases.releaseLeases(s.context) + s.tables.releaseTables(s.context) // The KV txn may be unusable - just leave it dead. Simply // shut down its memory monitor. @@ -637,22 +637,20 @@ func (s *Session) evalCtx() parser.EvalContext { func (s *Session) resetForBatch(e *Executor) { // Update the database cache to a more recent copy, so that we can use tables // that we created in previous batches of the same transaction. - s.leases.databaseCache = e.getDatabaseCache() + s.tables.databaseCache = e.getDatabaseCache() s.TxnState.schemaChangers.curGroupNum++ } -// releaseLeases releases all leases currently held by the Session. -func (lc *LeaseCollection) releaseLeases(ctx context.Context) { - if lc.leases != nil { - if log.V(2) { - log.VEventf(ctx, 2, "releasing %d leases", len(lc.leases)) - } - for _, lease := range lc.leases { - if err := lc.leaseMgr.Release(lease); err != nil { +// releaseTables releases all tables currently held by the Session. +func (tc *TableCollection) releaseTables(ctx context.Context) { + if tc.tables != nil { + log.VEventf(ctx, 2, "releasing %d tables", len(tc.tables)) + for _, table := range tc.tables { + if err := tc.leaseMgr.Release(table); err != nil { log.Warning(ctx, err) } } - lc.leases = nil + tc.tables = nil } } @@ -1042,7 +1040,7 @@ func (scc *schemaChangerCollection) execSchemaChanges( ctx context.Context, e *Executor, session *Session, results ResultList, ) { // Release the leases once a transaction is complete. - session.leases.releaseLeases(ctx) + session.tables.releaseTables(ctx) if e.cfg.SchemaChangerTestingKnobs.SyncFilter != nil { e.cfg.SchemaChangerTestingKnobs.SyncFilter(TestingSchemaChangerCollection{scc}) } diff --git a/pkg/sql/show.go b/pkg/sql/show.go index 0bc32d9f9d0e..3e71c29dfdbb 100644 --- a/pkg/sql/show.go +++ b/pkg/sql/show.go @@ -427,7 +427,7 @@ func (p *planner) showCreateTable( buf.WriteString(primary) for _, idx := range desc.Indexes { if fk := idx.ForeignKey; fk.IsSet() { - fkTable, err := p.session.leases.getTableLeaseByID(ctx, p.txn, fk.Table) + fkTable, err := p.session.tables.getTableVersionByID(ctx, p.txn, fk.Table) if err != nil { return "", err } diff --git a/pkg/sql/show_fingerprints.go b/pkg/sql/show_fingerprints.go index 731c13b66948..ce84c76f2424 100644 --- a/pkg/sql/show_fingerprints.go +++ b/pkg/sql/show_fingerprints.go @@ -177,7 +177,7 @@ func (n *showFingerprintsNode) Next(ctx context.Context) (bool, error) { // need to set `avoidCachedDescriptors`. p := makeInternalPlanner("SELECT", txn, security.RootUser, n.p.LeaseMgr().memMetrics) defer finishInternalPlanner(p) - p.session.leases.leaseMgr = n.p.LeaseMgr() + p.session.tables.leaseMgr = n.p.LeaseMgr() p.avoidCachedDescriptors = true var err error diff --git a/pkg/sql/table.go b/pkg/sql/table.go index 4ee3fb034f75..b43b6052a3a4 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -71,6 +71,12 @@ func GetKeysForTableDescriptor( return } +// A unique id for a particular table descriptor version. +type tableVersionID struct { + id sqlbase.ID + version sqlbase.DescriptorVersion +} + // SchemaAccessor provides helper methods for using the SQL schema. type SchemaAccessor interface { // NB: one can use GetTableDescFromID() to retrieve a descriptor for @@ -233,9 +239,24 @@ func filterTableState(tableDesc *sqlbase.TableDescriptor) error { return nil } -// getTableLease acquires a lease for the specified table. The lease must -// be released by calling lc.releaseLeases(). -func (lc *LeaseCollection) getTableLease( +func (tc *TableCollection) maybeChangeTimestamp(ctx context.Context, txn *client.Txn) { + if tc.timestamp != (hlc.Timestamp{}) && tc.timestamp != txn.OrigTimestamp() { + txn.ResetDeadline() + tc.releaseTables(ctx) + } +} + +// getTableVersion returns a table descriptor with a version suitable for +// the transaction. The table must be released by calling tc.releaseTables(). +// +// TODO(vivek): The suitability of the table version returned is only partial +// checked. The expiration time for the table descriptor is added as a deadline +// for the transaction. The ModificationTime of the table is not compared to the +// timestamp of the transaction. Fix this to be such that: +// table.ModificationTime <= txn.Timestamp < expirationTime +// +// TODO(vivek): Rollback most of #6418. +func (tc *TableCollection) getTableVersion( ctx context.Context, txn *client.Txn, vt VirtualTabler, tn *parser.TableName, ) (*sqlbase.TableDescriptor, error) { if log.V(2) { @@ -260,56 +281,55 @@ func (lc *LeaseCollection) getTableLease( return tbl, nil } - dbID, err := lc.databaseCache.getDatabaseID(ctx, txn, vt, tn.Database()) + dbID, err := tc.databaseCache.getDatabaseID(ctx, txn, vt, tn.Database()) if err != nil { return nil, err } - // First, look to see if we already have a lease for this table. + // If the txn has been pushed the table collection is released and + // txn deadline is reset. + tc.maybeChangeTimestamp(ctx, txn) + + // First, look to see if we already have the table. // This ensures that, once a SQL transaction resolved name N to id X, it will // continue to use N to refer to X even if N is renamed during the // transaction. - var lease *LeaseState - for _, l := range lc.leases { - if parser.ReNormalizeName(l.Name) == tn.TableName.Normalize() && - l.ParentID == dbID { - lease = l + for _, table := range tc.tables { + if parser.ReNormalizeName(table.Name) == tn.TableName.Normalize() && + table.ParentID == dbID { if log.V(2) { - log.Infof(ctx, "found lease in planner cache for table '%s'", tn) + log.Infof(ctx, "found table in table collection for table '%s'", tn) } - break + return &table, nil } } - // If we didn't find a lease or the lease is about to expire, acquire one. - if lease == nil || lc.removeLeaseIfExpiring(ctx, txn, lease) { - var err error - lease, err = lc.leaseMgr.AcquireByName(ctx, txn, dbID, tn.Table()) - if err != nil { - if err == sqlbase.ErrDescriptorNotFound { - // Transform the descriptor error into an error that references the - // table's name. - return nil, sqlbase.NewUndefinedTableError(tn.String()) - } - return nil, err - } - lc.leases = append(lc.leases, lease) - if log.V(2) { - log.Infof(ctx, "added lease on table '%s' to planner cache", tn) + table, expiration, err := tc.leaseMgr.AcquireByName(ctx, txn, dbID, tn.Table()) + if err != nil { + if err == sqlbase.ErrDescriptorNotFound { + // Transform the descriptor error into an error that references the + // table's name. + return nil, sqlbase.NewUndefinedTableError(tn.String()) } - // If the lease we just acquired expires before the txn's deadline, reduce - // the deadline. - txn.UpdateDeadlineMaybe(hlc.Timestamp{WallTime: lease.Expiration().UnixNano()}) + return nil, err + } + tc.timestamp = txn.OrigTimestamp() + tc.tables = append(tc.tables, table) + if log.V(2) { + log.Infof(ctx, "added table '%s' to table collection", tn) } - return &lease.TableDescriptor, nil + // If the table we just acquired expires before the txn's deadline, reduce + // the deadline. + txn.UpdateDeadlineMaybe(expiration) + return &table, nil } -// getTableLeaseByID is a by-ID variant of getTableLease (i.e. uses same cache). -func (lc *LeaseCollection) getTableLeaseByID( +// getTableVersionByID is a by-ID variant of getTableVersion (i.e. uses same cache). +func (tc *TableCollection) getTableVersionByID( ctx context.Context, txn *client.Txn, tableID sqlbase.ID, ) (*sqlbase.TableDescriptor, error) { if log.V(2) { - log.Infof(ctx, "planner acquiring lease on table ID %d", tableID) + log.Infof(ctx, "planner getting table on table ID %d", tableID) } if testDisableTableLeases { @@ -323,74 +343,39 @@ func (lc *LeaseCollection) getTableLeaseByID( return table, nil } - // First, look to see if we already have a lease for this table -- including - // leases acquired via `getTableLease`. - var lease *LeaseState - for _, l := range lc.leases { - if l.ID == tableID { - lease = l - if log.V(2) { - log.Infof(ctx, "found lease in planner cache for table %d", tableID) - } - break - } - } + // If the txn has been pushed the table collection is released and + // txn deadline is reset. + tc.maybeChangeTimestamp(ctx, txn) - // If we didn't find a lease or the lease is about to expire, acquire one. - if lease == nil || lc.removeLeaseIfExpiring(ctx, txn, lease) { - var err error - lease, err = lc.leaseMgr.Acquire(ctx, txn, tableID, 0) - if err != nil { - if err == sqlbase.ErrDescriptorNotFound { - // Transform the descriptor error into an error that references the - // table's ID. - return nil, sqlbase.NewUndefinedTableError(fmt.Sprintf("", tableID)) + // First, look to see if we already have the table -- including those + // via `getTableVersion`. + for _, table := range tc.tables { + if table.ID == tableID { + if log.V(2) { + log.Infof(ctx, "found table %d in table cache", tableID) } - return nil, err + return &table, nil } - lc.leases = append(lc.leases, lease) - // If the lease we just acquired expires before the txn's deadline, reduce - // the deadline. - txn.UpdateDeadlineMaybe(hlc.Timestamp{WallTime: lease.Expiration().UnixNano()}) } - return &lease.TableDescriptor, nil -} -// removeLeaseIfExpiring removes a lease and returns true if it is about to expire. -// The method also resets the transaction deadline. -func (lc *LeaseCollection) removeLeaseIfExpiring( - ctx context.Context, txn *client.Txn, lease *LeaseState, -) bool { - if lease == nil || lease.hasSomeLifeLeft(lc.leaseMgr.clock) { - return false - } - - // Remove the lease from session.leases. - idx := -1 - for i, l := range lc.leases { - if l == lease { - idx = i - break + table, expiration, err := tc.leaseMgr.Acquire(ctx, txn, tableID, 0) + if err != nil { + if err == sqlbase.ErrDescriptorNotFound { + // Transform the descriptor error into an error that references the + // table's ID. + return nil, sqlbase.NewUndefinedTableError(fmt.Sprintf("", tableID)) } + return nil, err } - if idx == -1 { - log.Warningf(ctx, "lease (%s) not found", lease) - return false - } - lc.leases[idx] = lc.leases[len(lc.leases)-1] - lc.leases[len(lc.leases)-1] = nil - lc.leases = lc.leases[:len(lc.leases)-1] - - if err := lc.leaseMgr.Release(lease); err != nil { - log.Warning(ctx, err) - } - - // Reset the deadline so that a new deadline will be set after the lease is acquired. - txn.ResetDeadline() - for _, l := range lc.leases { - txn.UpdateDeadlineMaybe(hlc.Timestamp{WallTime: l.Expiration().UnixNano()}) + tc.timestamp = txn.OrigTimestamp() + tc.tables = append(tc.tables, table) + if log.V(2) { + log.Infof(ctx, "added table '%s' to table collection", table.Name) } - return true + // If the table we just acquired expires before the txn's deadline, reduce + // the deadline. + txn.UpdateDeadlineMaybe(expiration) + return &table, nil } // getTableNames retrieves the list of qualified names of tables @@ -452,7 +437,7 @@ func (p *planner) createSchemaChangeJob( DescriptorIDs: sqlbase.IDs{tableDesc.GetID()}, Details: jobs.SchemaChangeJobDetails{}, } - jobLogger := jobs.NewJobLogger(p.ExecCfg().DB, InternalExecutor{LeaseManager: p.session.leases.leaseMgr}, jobRecord) + jobLogger := jobs.NewJobLogger(p.ExecCfg().DB, InternalExecutor{LeaseManager: p.session.tables.leaseMgr}, jobRecord) if err := jobLogger.WithTxn(p.txn).Created(ctx); err != nil { return sqlbase.InvalidMutationID, nil } @@ -533,7 +518,7 @@ func expandTableGlob( func (p *planner) searchAndQualifyDatabase(ctx context.Context, tn *parser.TableName) error { t := *tn - descFunc := p.session.leases.getTableLease + descFunc := p.session.tables.getTableVersion if p.avoidCachedDescriptors { // AS OF SYSTEM TIME queries need to fetch the table descriptor at the // specified time, and never lease anything. The proto transaction already diff --git a/pkg/sql/table_test.go b/pkg/sql/table_test.go index 3564849f74f0..b3936e166dbc 100644 --- a/pkg/sql/table_test.go +++ b/pkg/sql/table_test.go @@ -19,15 +19,12 @@ package sql import ( "reflect" "testing" - "time" "golang.org/x/net/context" - "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) @@ -269,54 +266,3 @@ func TestPrimaryKeyUnspecified(t *testing.T) { t.Fatalf("unexpected error: %v", err) } } - -func TestRemoveLeaseIfExpiring(t *testing.T) { - defer leaktest.AfterTest(t)() - - mc := hlc.NewManualClock(123) - lc := &LeaseCollection{ - leaseMgr: &LeaseManager{ - LeaseStore: LeaseStore{clock: hlc.NewClock(mc.UnixNano, time.Nanosecond)}, - }, - } - - var txn client.Txn - - if lc.removeLeaseIfExpiring(context.TODO(), &txn, nil) { - t.Error("expected false with nil input") - } - - // Add a lease to the planner. - d := int64(LeaseDuration) - l1 := &LeaseState{expiration: parser.DTimestamp{Time: time.Unix(0, mc.UnixNano()+d+1)}} - lc.leases = append(lc.leases, l1) - et := hlc.Timestamp{WallTime: l1.Expiration().UnixNano()} - txn.UpdateDeadlineMaybe(et) - - if lc.removeLeaseIfExpiring(context.TODO(), &txn, l1) { - t.Error("expected false with a non-expiring lease") - } - if d := *txn.GetDeadline(); d != et { - t.Errorf("expected deadline %s but got %s", et, d) - } - - // Advance the clock so that l1 will be expired. - mc.Increment(d + 1) - - // Add another lease. - l2 := &LeaseState{expiration: parser.DTimestamp{Time: time.Unix(0, mc.UnixNano()+d+1)}} - lc.leases = append(lc.leases, l2) - if !lc.removeLeaseIfExpiring(context.TODO(), &txn, l1) { - t.Error("expected true with an expiring lease") - } - et = hlc.Timestamp{WallTime: l2.Expiration().UnixNano()} - txn.UpdateDeadlineMaybe(et) - - if !(len(lc.leases) == 1 && lc.leases[0] == l2) { - t.Errorf("expected leases to contain %s but has %s", l2, lc.leases) - } - - if d := *txn.GetDeadline(); d != et { - t.Errorf("expected deadline %s, but got %s", et, d) - } -} diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index 6eee92005eae..52f825b2b6a9 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -52,7 +52,7 @@ func (p *planner) Truncate(ctx context.Context, n *parser.Truncate) (planNode, e return nil, err } - tableDesc, err := p.session.leases.getTableLease(ctx, p.txn, p.getVirtualTabler(), tn) + tableDesc, err := p.session.tables.getTableVersion(ctx, p.txn, p.getVirtualTabler(), tn) if err != nil { return nil, err } @@ -77,7 +77,7 @@ func (p *planner) Truncate(ctx context.Context, n *parser.Truncate) (planNode, e continue } - other, err := p.session.leases.getTableLeaseByID(ctx, p.txn, ref.Table) + other, err := p.session.tables.getTableVersionByID(ctx, p.txn, ref.Table) if err != nil { return nil, err } diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 397d123aad34..2c7bf1289d09 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -43,7 +43,7 @@ type editNodeBase struct { func (p *planner) makeEditNode( ctx context.Context, tn *parser.TableName, priv privilege.Kind, ) (editNodeBase, error) { - tableDesc, err := p.session.leases.getTableLease(ctx, p.txn, p.getVirtualTabler(), tn) + tableDesc, err := p.session.tables.getTableVersion(ctx, p.txn, p.getVirtualTabler(), tn) if err != nil { return editNodeBase{}, err }