diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 6dc95659b800..f2c900470385 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -138,6 +138,11 @@ type Collection struct { // It must be set in the multi-tenant environment for ephemeral // SQL pods. It should not be set otherwise. sqlLivenessSession sqlliveness.Session + + // LeaseGeneration is the first generation value observed by this + // txn. This guarantees the generation for long-running transactions + // this value stays the same for the life of the transaction. + leaseGeneration int64 } // FromTxn is a convenience function to extract a descs.Collection which is @@ -199,6 +204,7 @@ func (tc *Collection) ReleaseLeases(ctx context.Context) { tc.leased.releaseAll(ctx) // Clear the associated sqlliveness.session tc.sqlLivenessSession = nil + tc.leaseGeneration = 0 } // ReleaseAll releases all state currently held by the Collection. @@ -210,11 +216,27 @@ func (tc *Collection) ReleaseAll(ctx context.Context) { tc.skipValidationOnWrite = false } +// ResetLeaseGeneration selects an initial value at the beginning of a txn +// for lease generation. +func (tc *Collection) ResetLeaseGeneration() { + // Note: If a collection doesn't have a lease manager assigned, then + // no generation will be selected. This can only happen with either + // bare-bones collections or test cases. + if tc.leased.lm != nil { + tc.leaseGeneration = tc.leased.lm.GetLeaseGeneration() + } +} + // GetLeaseGeneration provides an integer which will change whenever new // descriptor versions are available. This can be used for fast comparisons // to make sure previously looked up information is still valid. func (tc *Collection) GetLeaseGeneration() int64 { - return tc.leased.lm.GetLeaseGeneration() + // Sanity: Pick a lease generation if one hasn't been set. + if tc.leaseGeneration == 0 { + tc.ResetLeaseGeneration() + } + // Return the cached lease generation, one should have been set earlier. + return tc.leaseGeneration } // HasUncommittedTables returns true if the Collection contains uncommitted diff --git a/pkg/sql/catalog/descs/factory.go b/pkg/sql/catalog/descs/factory.go index 2db5048b50ca..25766bfda7a3 100644 --- a/pkg/sql/catalog/descs/factory.go +++ b/pkg/sql/catalog/descs/factory.go @@ -144,12 +144,19 @@ func (cf *CollectionFactory) NewCollection(ctx context.Context, options ...Optio opt(&cfg) } v := cf.settings.Version.ActiveVersion(ctx) + // If the leaseMgr is nil then ensure we have a nil LeaseManager interface, + // otherwise comparisons against a nil implementation will fail. + var lm LeaseManager + lm = cf.leaseMgr + if cf.leaseMgr == nil { + lm = nil + } return &Collection{ settings: cf.settings, version: v, hydrated: cf.hydrated, virtual: makeVirtualDescriptors(cf.virtualSchemas), - leased: makeLeasedDescriptors(cf.leaseMgr), + leased: makeLeasedDescriptors(lm), uncommitted: makeUncommittedDescriptors(cfg.monitor), uncommittedComments: makeUncommittedComments(), uncommittedZoneConfigs: makeUncommittedZoneConfigs(), diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 583b5f3279d5..13842a51fa38 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -1281,6 +1281,7 @@ func NewLeaseManager( sem: quotapool.NewIntPool("lease manager", leaseConcurrencyLimit), refreshAllLeases: make(chan struct{}), } + lm.leaseGeneration.Swap(1) // Start off with 1 as the initial value. lm.storage.regionPrefix = &atomic.Value{} lm.storage.regionPrefix.Store(enum.One) lm.storage.sessionBasedLeasingMode = lm diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 6fda0a3770f9..1e3add294f73 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -4010,7 +4010,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // Session is considered active when executing a transaction. ex.totalActiveTimeStopWatch.Start() - if err := ex.maybeSetSQLLivenessSession(); err != nil { + if err := ex.maybeSetSQLLivenessSessionAndGeneration(); err != nil { return advanceInfo{}, err } case txnCommit: @@ -4115,7 +4115,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // In addition to resetting the extraTxnState, the restart event may // also need to reset the sqlliveness.Session. ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, payloadErr) - if err := ex.maybeSetSQLLivenessSession(); err != nil { + if err := ex.maybeSetSQLLivenessSessionAndGeneration(); err != nil { return advanceInfo{}, err } default: @@ -4186,7 +4186,7 @@ func (ex *connExecutor) waitForTxnJobs() error { return retErr } -func (ex *connExecutor) maybeSetSQLLivenessSession() error { +func (ex *connExecutor) maybeSetSQLLivenessSessionAndGeneration() error { if !ex.server.cfg.Codec.ForSystemTenant() || ex.server.cfg.TestingKnobs.ForceSQLLivenessSession { // Update the leased descriptor collection with the current sqlliveness.Session. @@ -4201,6 +4201,8 @@ func (ex *connExecutor) maybeSetSQLLivenessSession() error { } ex.extraTxnState.descCollection.SetSession(session) } + // Reset the lease generation at the same time. + ex.extraTxnState.descCollection.ResetLeaseGeneration() return nil }