diff --git a/pkg/base/config.go b/pkg/base/config.go index 8f800d77aaa5..a4935086d792 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -73,6 +73,18 @@ const ( // with a value of 0.2 and a liveness duration of 10 seconds, each node's // liveness record would be eagerly renewed after 2 seconds. livenessRenewalFraction = 0.5 + + // DefaultTableDescriptorLeaseDuration is the default mean duration a + // lease will be acquired for. The actual duration is jittered using + // the jitter fraction. Jittering is done to prevent multiple leases + // from being renewed simultaneously if they were all acquired + // simultaneously. + DefaultTableDescriptorLeaseDuration = 5 * time.Minute + + // DefaultTableDescriptorLeaseJitterFraction is the default factor + // that we use to randomly jitter the lease duration when acquiring a + // new lease and the lease renewal timeout. + DefaultTableDescriptorLeaseJitterFraction = 0.25 ) var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt( @@ -466,3 +478,23 @@ func TempStorageConfigFromEnv( Mon: &monitor, } } + +// LeaseManagerConfig holds table lease manager parameters. +type LeaseManagerConfig struct { + // TableDescriptorLeaseDuration is the mean duration a lease will be + // acquired for. + TableDescriptorLeaseDuration time.Duration + + // TableDescriptorLeaseJitterFraction is the factor that we use to + // randomly jitter the lease duration when acquiring a new lease and + // the lease renewal timeout. + TableDescriptorLeaseJitterFraction float64 +} + +// NewLeaseManagerConfig initializes a LeaseManagerConfig with default values. +func NewLeaseManagerConfig() *LeaseManagerConfig { + return &LeaseManagerConfig{ + TableDescriptorLeaseDuration: DefaultTableDescriptorLeaseDuration, + TableDescriptorLeaseJitterFraction: DefaultTableDescriptorLeaseJitterFraction, + } +} diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index a24ba8f2f964..362b6baecf18 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -37,6 +37,9 @@ type TestServerArgs struct { *cluster.Settings RaftConfig + // LeaseManagerConfig holds configuration values specific to the LeaseManager. + LeaseManagerConfig *LeaseManagerConfig + // PartOfCluster must be set if the TestServer is joining others in a cluster. // If not set (and hence the server is the only one in the cluster), the // default zone config will be overridden to disable all replication - so that diff --git a/pkg/server/config.go b/pkg/server/config.go index 53a010100edf..24a2f5bf6f24 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -122,6 +122,9 @@ type Config struct { base.RaftConfig + // LeaseManagerConfig holds configuration values specific to the LeaseManager. + LeaseManagerConfig *base.LeaseManagerConfig + // Unix socket: for postgres only. SocketFile string @@ -369,6 +372,7 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config { cfg.Config.InitDefaults() cfg.RaftConfig.SetDefaults() + cfg.LeaseManagerConfig = base.NewLeaseManagerConfig() return cfg } diff --git a/pkg/server/server.go b/pkg/server/server.go index f9a0c57c1f98..a7fa247657ae 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -277,7 +277,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs) } s.leaseMgr = sql.NewLeaseManager(&s.nodeIDContainer, *s.db, s.clock, lmKnobs, - s.stopper, &s.internalMemMetrics) + s.stopper, &s.internalMemMetrics, s.cfg.LeaseManagerConfig) s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip) // We do not set memory monitors or a noteworthy limit because the children of diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 8a1db6bb9e61..fc1da6d18ca5 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -105,6 +105,11 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config { cfg.TestingKnobs = params.Knobs cfg.RaftConfig = params.RaftConfig cfg.RaftConfig.SetDefaults() + if params.LeaseManagerConfig != nil { + cfg.LeaseManagerConfig = params.LeaseManagerConfig + } else { + cfg.LeaseManagerConfig = base.NewLeaseManagerConfig() + } if params.JoinAddr != "" { cfg.JoinList = []string{params.JoinAddr} } diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index ab78d9999559..102789f35161 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -38,19 +38,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) // TODO(pmattis): Periodically renew leases for tables that were used recently and // for which the lease will expire soon. -var ( - // LeaseDuration is the mean duration a lease will be acquired for. The - // actual duration is jittered in the range - // [0.75,1.25]*LeaseDuration. Exported for testing purposes only. - LeaseDuration = 5 * time.Minute -) - // tableVersionState holds the state for a table version. This includes // the lease information for a table version. // TODO(vivek): A node only needs to manage lease information on what it @@ -118,14 +112,26 @@ type LeaseStore struct { clock *hlc.Clock nodeID *base.NodeIDContainer + // leaseDuration is the mean duration a lease will be acquired for. The + // actual duration is jittered using leaseJitterFraction. Jittering is done to + // prevent multiple leases from being renewed simultaneously if they were all + // acquired simultaneously. + leaseDuration time.Duration + // leaseJitterFraction is the factor that we use to randomly jitter the lease + // duration when acquiring a new lease and the lease renewal timeout. The + // range of the actual lease duration will be + // [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration] + leaseJitterFraction float64 + testingKnobs LeaseStoreTestingKnobs memMetrics *MemoryMetrics } // jitteredLeaseDuration returns a randomly jittered duration from the interval -// [0.75 * leaseDuration, 1.25 * leaseDuration]. -func jitteredLeaseDuration() time.Duration { - return time.Duration(float64(LeaseDuration) * (0.75 + 0.5*rand.Float64())) +// [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration]. +func (s LeaseStore) jitteredLeaseDuration() time.Duration { + return time.Duration(float64(s.leaseDuration) * (1 - s.leaseJitterFraction + + 2*s.leaseJitterFraction*rand.Float64())) } // acquire a lease on the most recent version of a table descriptor. @@ -137,7 +143,7 @@ func (s LeaseStore) acquire( var table *tableVersionState err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { expiration := txn.OrigTimestamp() - expiration.WallTime += int64(jitteredLeaseDuration()) + expiration.WallTime += int64(s.jitteredLeaseDuration()) if expiration.Less(minExpirationTime) { expiration = minExpirationTime } @@ -441,6 +447,11 @@ func (s LeaseStore) getForExpiration( return table, err } +// leaseToken is an opaque token representing a lease. It's distinct from a +// lease to define restricted capabilities and prevent improper use of a lease +// where we instead have leaseTokens. +type leaseToken *tableVersionState + // tableSet maintains an ordered set of tableVersionState objects sorted // by version. It supports addition and removal of elements, finding the // table for a particular version, or finding the most recent table version. @@ -466,6 +477,15 @@ func (l *tableSet) String() string { return buf.String() } +// isNewest checks if the leaseToken represents the newest lease in the +// tableSet. +func (l *tableSet) isNewest(t leaseToken) bool { + if len(l.data) == 0 { + return false + } + return leaseToken(l.data[len(l.data)-1]) == t +} + func (l *tableSet) insert(s *tableVersionState) { i, match := l.findIndex(s.Version) if match { @@ -536,6 +556,8 @@ func (l *tableSet) findVersion(version sqlbase.DescriptorVersion) *tableVersionS return nil } +const acquireGroupKey = "acquire" + type tableState struct { id sqlbase.ID // The cache is updated every time we acquire or release a table. @@ -545,6 +567,9 @@ type tableState struct { mu struct { syncutil.Mutex + // group is used for all calls made to acquireNodeLease to prevent + // concurrent lease acquisitions from the store. + group singleflight.Group // table descriptors sorted by increasing version. This set always // contains a table descriptor version with a lease as the latest // entry. There may be more than one active lease when the system is @@ -554,11 +579,6 @@ type tableState struct { // entry is created with the expiration time of the new lease and // the older entry is removed. active tableSet - // A channel used to indicate whether a lease is actively being - // acquired. nil if there is no lease acquisition in progress for - // the table. If non-nil, the channel will be closed when lease - // acquisition completes. - acquiring chan struct{} // Indicates that the table has been dropped, or is being dropped. // If set, leases are released from the store as soon as their // refcount drops to 0, as opposed to waiting until they expire. @@ -574,17 +594,26 @@ func (t *tableState) acquire( ) (*tableVersionState, error) { t.mu.Lock() defer t.mu.Unlock() - // Wait for any existing lease acquisition. - t.acquireWait() - // Acquire a lease if no lease exists or if the latest lease is - // about to expire. - if s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp) { - if err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}); err != nil { - return nil, err + // Acquire a lease if no lease exists or if the latest lease is about to + // expire. Looping is necessary because lease acquisition is done without + // holding the tableState lock, so anything can happen in between lease + // acquisition and us getting control again. + for s := t.mu.active.findNewest(); s == nil || s.hasExpired(timestamp); s = t.mu.active.findNewest() { + var resultChan <-chan singleflight.Result + resultChan, _ = t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + return t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + }) + t.mu.Unlock() + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireBlock) + } + result := <-resultChan + t.mu.Lock() + if result.Err != nil { + return nil, result.Err } } - return t.findForTimestamp(ctx, timestamp, m) } @@ -697,20 +726,59 @@ func (t *tableState) findForTimestamp( // // t.mu must be locked. func (t *tableState) acquireFreshestFromStoreLocked(ctx context.Context, m *LeaseManager) error { - // Ensure there is no lease acquisition in progress. - t.acquireWait() - - // Move forward to acquire a fresh table lease. + // We need to acquire a lease on a "fresh" descriptor, meaning that joining + // a potential in-progress lease acquisition is generally not good enough. + // If we are to join an in-progress acquisition, it needs to be an acquisition + // initiated after this point. + // So, we handle two cases: + // 1. The first DoChan() call tells us that we didn't join an in-progress + // acquisition. Great, the lease that's being acquired is good. + // 2. The first DoChan() call tells us that we did join an in-progress acq. + // We have to wait this acquisition out; it's not good for us. But any + // future acquisition is good, so the next time around the loop it doesn't + // matter if we initiate a request or join an in-progress one. + // In both cases, we need to check if the lease we want is still valid because + // lease acquisition is done without holding the tableState lock, so anything + // can happen in between lease acquisition and us getting control again. + attemptsMade := 0 + for { + // Move forward the expiry to acquire a fresh table lease. + + // Set the min expiration time to guarantee that the lease acquired is the + // last lease in t.mu.active. + // TODO(vivek): the expiration time is no longer needed to sort the + // tableVersionState. Get rid of this. + minExpirationTime := hlc.Timestamp{} + s := t.mu.active.findNewest() + if s != nil { + minExpirationTime = s.expiration.Add(int64(time.Millisecond), 0) + } - // Set the min expiration time to guarantee that the lease acquired is the - // last lease in t.mu.active . - minExpirationTime := hlc.Timestamp{} - newestTable := t.mu.active.findNewest() - if newestTable != nil { - minExpirationTime = newestTable.expiration.Add(int64(time.Millisecond), 0) + resultChan, wasCalled := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + return t.acquireNodeLease(ctx, m, minExpirationTime) + }) + t.mu.Unlock() + if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil { + m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent(LeaseAcquireFreshestBlock) + } + result := <-resultChan + t.mu.Lock() + if result.Err != nil { + return result.Err + } + l := result.Val.(leaseToken) + if wasCalled && t.mu.active.isNewest(l) { + // Case 1: we didn't join an in-progress call and the lease is still + // valid. + break + } else if attemptsMade > 1 && t.mu.active.isNewest(l) { + // Case 2: more than one acquisition has happened and the lease is still + // valid. + break + } + attemptsMade++ } - - return t.acquireNodeLease(ctx, m, minExpirationTime) + return nil } // upsertLocked inserts a lease for a particular table version. @@ -757,18 +825,6 @@ func (t *tableState) removeInactiveVersions(m *LeaseManager) { } } -// acquireWait waits until no lease acquisition is in progress. -func (t *tableState) acquireWait() { - // Spin until no lease acquisition is in progress. - for acquiring := t.mu.acquiring; acquiring != nil; acquiring = t.mu.acquiring { - // We're called with mu locked, but need to unlock it while we wait - // for the in-progress lease acquisition to finish. - t.mu.Unlock() - <-acquiring - t.mu.Lock() - } -} - // If the lease cannot be obtained because the descriptor is in the process of // being dropped, the error will be errTableDropped. // minExpirationTime, if not set to the zero value, will be used as a lower @@ -776,32 +832,21 @@ func (t *tableState) acquireWait() { // jitter in the expiration time, and guarantee that we get a lease that will be // inserted at the end of the lease set (i.e. it will be returned by // findNewest() from now on). -// -// t.mu needs to be locked. func (t *tableState) acquireNodeLease( ctx context.Context, m *LeaseManager, minExpirationTime hlc.Timestamp, -) error { +) (leaseToken, error) { if m.isDraining() { - return errors.New("cannot acquire lease when draining") + return nil, errors.New("cannot acquire lease when draining") } - - // Notify when lease has been acquired. - t.mu.acquiring = make(chan struct{}) - defer func() { - close(t.mu.acquiring) - t.mu.acquiring = nil - }() - // We're called with mu locked, but need to unlock it during lease - // acquisition. - t.mu.Unlock() table, err := m.LeaseStore.acquire(ctx, t.id, minExpirationTime) - t.mu.Lock() if err != nil { - return err + return nil, err } + t.mu.Lock() + defer t.mu.Unlock() t.upsertLocked(ctx, table, m) t.tableNameCache.insert(table) - return nil + return leaseToken(table), nil } func (t *tableState) release(table *sqlbase.TableDescriptor, m *LeaseManager) error { @@ -925,6 +970,19 @@ func (t *tableState) purgeOldVersions( return err } +// LeaseAcquireBlockType is the type of blocking result event when +// calling LeaseAcquireResultBlockEvent. +type LeaseAcquireBlockType int + +const ( + // LeaseAcquireBlock denotes the LeaseAcquireResultBlockEvent is + // coming from tableState.acquire(). + LeaseAcquireBlock LeaseAcquireBlockType = iota + // LeaseAcquireFreshestBlock denotes the LeaseAcquireResultBlockEvent is + // from tableState.acquireFreshestFromStoreLocked(). + LeaseAcquireFreshestBlock +) + // LeaseStoreTestingKnobs contains testing knobs. type LeaseStoreTestingKnobs struct { // Called after a lease is removed from the store, with any operation error. @@ -932,6 +990,9 @@ type LeaseStoreTestingKnobs struct { LeaseReleasedEvent func(table sqlbase.TableDescriptor, err error) // Called after a lease is acquired, with any operation error. LeaseAcquiredEvent func(table sqlbase.TableDescriptor, err error) + // Called before waiting on a results from a DoChan call of acquireNodeLease + // in tableState.acquire() and tableState.acquireFreshestFromStoreLocked(). + LeaseAcquireResultBlockEvent func(leaseBlockType LeaseAcquireBlockType) // RemoveOnceDereferenced forces leases to be removed // as soon as they are dereferenced. RemoveOnceDereferenced bool @@ -1096,14 +1157,17 @@ func NewLeaseManager( testingKnobs LeaseManagerTestingKnobs, stopper *stop.Stopper, memMetrics *MemoryMetrics, + cfg *base.LeaseManagerConfig, ) *LeaseManager { lm := &LeaseManager{ LeaseStore: LeaseStore{ - db: db, - clock: clock, - nodeID: nodeID, - testingKnobs: testingKnobs.LeaseStoreTestingKnobs, - memMetrics: memMetrics, + db: db, + clock: clock, + nodeID: nodeID, + leaseDuration: cfg.TableDescriptorLeaseDuration, + leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction, + testingKnobs: testingKnobs.LeaseStoreTestingKnobs, + memMetrics: memMetrics, }, testingKnobs: testingKnobs, tableNames: tableNameCache{ diff --git a/pkg/sql/lease_internal_test.go b/pkg/sql/lease_internal_test.go index 7229072cbc76..dcd3771f62c0 100644 --- a/pkg/sql/lease_internal_test.go +++ b/pkg/sql/lease_internal_test.go @@ -19,16 +19,19 @@ package sql import ( "fmt" "sync" + "sync/atomic" "testing" "golang.org/x/net/context" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" ) func TestTableSet(t *testing.T) { @@ -411,7 +414,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); // Try to trigger the race repeatedly: race an AcquireByName against a // Release. - // tableChan acts as a barrier, synchornizing the two routines at every + // tableChan acts as a barrier, synchronizing the two routines at every // iteration. tableChan := make(chan *sqlbase.TableDescriptor) errChan := make(chan error) @@ -558,3 +561,201 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); wg.Wait() } + +// Test one possible outcome of a race between a lease acquisition (the first +// case through tableState.acquire(), the second through +// tableState.acquireFreshestFromStore()) and a release of the lease that was +// just acquired. Precisely: +// 1. Thread 1 calls either acquireFreshestFromStore() or acquire(). +// 2. Thread 1 releases the lock on tableState and starts acquisition of a lease +// from the store, blocking until it's finished. +// 3. Thread 2 calls acquire(). The lease has not been acquired yet, so it +// also enters the acquisition code path (calling DoChan). +// 4. Thread 2 proceeds to release the lock on tableState waiting for the +// in-flight acquisition. +// 4. The lease is acquired from the store and the waiting routines are +// unblocked. +// 5. Thread 2 unblocks first, and releases the new lease, for whatever reason. +// 5. Thread 1 wakes up. At this point, a naive implementation would use the +// newly acquired lease, which would be incorrect. The test checks that +// acquireFreshestFromStore() or acquire() notices, after re-acquiring the +// tableState lock, that the new lease has been released and acquires a new +// one. +func TestLeaseAcquireAndReleaseConcurrenctly(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Result is a struct for moving results to the main result routine. + type Result struct { + table *sqlbase.TableDescriptor + exp hlc.Timestamp + err error + } + + descID := sqlbase.ID(keys.LeaseTableID) + + // acquireBlock calls Acquire. + acquireBlock := func( + ctx context.Context, + m *LeaseManager, + acquireChan chan Result, + ) { + table, e, err := m.Acquire(ctx, m.clock.Now(), descID) + acquireChan <- Result{err: err, exp: e, table: table} + } + + testCases := []struct { + name string + // Whether the second routine is a call to LeaseManager.acquireFreshest or + // not. This determines which channel we unblock. + isSecondCallAcquireFreshest bool + }{ + + // Checks what happens when the race between between acquire() and + // lease release occurs. + { + name: "CallAcquireConcurrently", + isSecondCallAcquireFreshest: false, + }, + // Checks what happens when the race between + // acquireFreshestFromStore() and lease release occurs. + { + name: "CallAcquireFreshestAndAcquireConcurrently", + isSecondCallAcquireFreshest: true, + }, + } + + for _, test := range testCases { + ctx := log.WithLogTag(context.Background(), "test: Lease", nil) + + t.Run(test.name, func(t *testing.T) { + // blockChan and freshestBlockChan is used to set up the race condition. + blockChan := make(chan struct{}) + freshestBlockChan := make(chan struct{}) + // acquisitionBlock is used to prevent acquireNodeLease from + // completing, to force a lease to delay its acquisition. + acquisitionBlock := make(chan struct{}) + + // preblock is used for the main routine to wait for all acquisition + // routines to catch up. + var preblock sync.WaitGroup + // acquireArrivals and acquireFreshestArrivals tracks how many times + // we've arrived at the knob codepath for the corresponding functions. + // This is needed because the fix to the race condition hits the knob more + // than once in a single routine, so we need to ignore any extra passes. + var acquireArrivals int32 + var acquireFreshestArrivals int32 + // leasesAcquiredCount counts how many leases were acquired in total. + var leasesAcquiredCount int32 + + removalTracker := NewLeaseRemovalTracker() + testingKnobs := base.TestingKnobs{ + SQLLeaseManager: &LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: LeaseStoreTestingKnobs{ + RemoveOnceDereferenced: true, + LeaseReleasedEvent: removalTracker.LeaseRemovedNotification, + LeaseAcquireResultBlockEvent: func(leaseBlockType LeaseAcquireBlockType) { + if leaseBlockType == LeaseAcquireBlock { + if count := atomic.LoadInt32(&acquireArrivals); (count < 1 && test.isSecondCallAcquireFreshest) || + (count < 2 && !test.isSecondCallAcquireFreshest) { + atomic.AddInt32(&acquireArrivals, 1) + preblock.Done() + <-blockChan + } + } else if leaseBlockType == LeaseAcquireFreshestBlock { + if atomic.LoadInt32(&acquireFreshestArrivals) < 1 { + atomic.AddInt32(&acquireFreshestArrivals, 1) + preblock.Done() + <-freshestBlockChan + } + } + }, + LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) { + atomic.AddInt32(&leasesAcquiredCount, 1) + <-acquisitionBlock + }, + }, + }, + } + + serverArgs := base.TestServerArgs{Knobs: testingKnobs} + + serverArgs.LeaseManagerConfig = base.NewLeaseManagerConfig() + // The LeaseJitterFraction is zero so leases will have + // monotonically increasing expiration. This prevents two leases + // from having the same expiration due to randomness, as the + // leases are checked for having a different expiration. + serverArgs.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = 0.0 + + s, _, _ := serverutils.StartServer( + t, serverArgs) + defer s.Stopper().Stop(context.TODO()) + leaseManager := s.LeaseManager().(*LeaseManager) + + acquireResultChan := make(chan Result) + + // Start two routines to acquire and release. + preblock.Add(2) + go acquireBlock(ctx, leaseManager, acquireResultChan) + if test.isSecondCallAcquireFreshest { + go func(ctx context.Context, m *LeaseManager, acquireChan chan Result) { + table, e, err := m.acquireFreshestFromStore(ctx, descID) + acquireChan <- Result{err: err, exp: e, table: table} + }(ctx, leaseManager, acquireResultChan) + + } else { + go acquireBlock(ctx, leaseManager, acquireResultChan) + } + + // Wait until both routines arrive. + preblock.Wait() + + // Allow the acquisition to finish. By delaying it until now, we guarantee + // both routines will receive the same lease. + acquisitionBlock <- struct{}{} + + // Allow the first routine to finish acquisition. In the case where both + // routines are calling Acquire(), first refers to whichever routine + // continues, order does not matter. + blockChan <- struct{}{} + // Wait for the first routine's results. + result1 := <-acquireResultChan + if result1.err != nil { + t.Fatal(result1.err) + } + + // Release the lease. This also causes it to get removed as the + // knob RemoveOnceDereferenced is set. + tracker := removalTracker.TrackRemoval(result1.table) + if err := leaseManager.Release(result1.table); err != nil { + t.Fatal(err) + } + // Wait until the lease is removed. + if err := tracker.WaitForRemoval(); err != nil { + t.Fatal(err) + } + + // Allow the second routine to proceed. + if test.isSecondCallAcquireFreshest { + freshestBlockChan <- struct{}{} + } else { + blockChan <- struct{}{} + } + + // Allow all future acquisitions to complete. + close(acquisitionBlock) + + // Get the acquisition results of the second routine. + result2 := <-acquireResultChan + if result2.err != nil { + t.Fatal(result2.err) + } + + if result1.table == result2.table && result1.exp == result2.exp { + t.Fatalf("Expected the leases to be different. TableDescriptor pointers are equal and both the same expiration") + } + if count := atomic.LoadInt32(&leasesAcquiredCount); count != 2 { + t.Fatalf("Expected to acquire 2 leases, instead got %d", count) + } + }) + } +} diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index e5c79a1c149a..eae0f2ed350b 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -50,9 +50,13 @@ type leaseTest struct { kvDB *client.DB nodes map[uint32]*sql.LeaseManager leaseManagerTestingKnobs sql.LeaseManagerTestingKnobs + cfg *base.LeaseManagerConfig } func newLeaseTest(t *testing.T, params base.TestServerArgs) *leaseTest { + if params.LeaseManagerConfig == nil { + params.LeaseManagerConfig = base.NewLeaseManagerConfig() + } s, db, kvDB := serverutils.StartServer(t, params) leaseTest := &leaseTest{ T: t, @@ -60,6 +64,7 @@ func newLeaseTest(t *testing.T, params base.TestServerArgs) *leaseTest { db: db, kvDB: kvDB, nodes: map[uint32]*sql.LeaseManager{}, + cfg: params.LeaseManagerConfig, } if params.Knobs.SQLLeaseManager != nil { leaseTest.leaseManagerTestingKnobs = @@ -191,6 +196,7 @@ func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager { t.leaseManagerTestingKnobs, t.server.Stopper(), &sql.MemoryMetrics{}, + t.cfg, ) t.nodes[nodeID] = mgr } @@ -223,8 +229,10 @@ func TestLeaseManager(testingT *testing.T) { // table and expiration. l1, e1 := t.mustAcquire(1, descID) l2, e2 := t.mustAcquire(1, descID) - if l1.ID != l2.ID || e1 != e2 { + if l1.ID != l2.ID { t.Fatalf("expected same lease, but found %v != %v", l1, l2) + } else if e1 != e2 { + t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2) } t.expectLeases(descID, "/1/1") // Node 2 never acquired a lease on descID, so we should expect an error. @@ -303,6 +311,12 @@ func TestLeaseManager(testingT *testing.T) { func TestLeaseManagerReacquire(testingT *testing.T) { defer leaktest.AfterTest(testingT)() params, _ := createTestServerParams() + + params.LeaseManagerConfig = base.NewLeaseManagerConfig() + // Set the lease duration such that the next lease acquisition will + // require the lease to be reacquired. + params.LeaseManagerConfig.TableDescriptorLeaseDuration = 0 + removalTracker := sql.NewLeaseRemovalTracker() params.Knobs = base.TestingKnobs{ SQLLeaseManager: &sql.LeaseManagerTestingKnobs{ @@ -316,17 +330,6 @@ func TestLeaseManagerReacquire(testingT *testing.T) { const descID = keys.LeaseTableID - // Set the lease duration such that the next lease acquisition will - // require the lease to be reacquired. - savedLeaseDuration := sql.LeaseDuration - defer func() { - sql.LeaseDuration = savedLeaseDuration - }() - - sql.LeaseDuration = 5 * time.Nanosecond - - time.Sleep(5 * sql.LeaseDuration) - l1, e1 := t.mustAcquire(1, descID) t.expectLeases(descID, "/1/1") diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 4da6b0a71337..e19d24dc27f9 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -173,6 +173,7 @@ func TestSchemaChangeProcess(t *testing.T) { var id = sqlbase.ID(keys.MaxReservedDescID + 2) var node = roachpb.NodeID(2) stopper := stop.NewStopper() + cfg := base.NewLeaseManagerConfig() leaseMgr := sql.NewLeaseManager( &base.NodeIDContainer{}, *kvDB, @@ -180,6 +181,7 @@ func TestSchemaChangeProcess(t *testing.T) { sql.LeaseManagerTestingKnobs{}, stopper, &sql.MemoryMetrics{}, + cfg, ) jobRegistry := s.JobRegistry().(*jobs.Registry) defer stopper.Stop(context.TODO()) diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index 50a605ff4407..83f2b4c5363a 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1263,7 +1263,7 @@ SELECT * from t.test WHERE k = 'test_key'; func TestReacquireLeaseOnRestart(t *testing.T) { defer leaktest.AfterTest(t)() - advancement := 2 * sql.LeaseDuration + advancement := 2 * base.DefaultTableDescriptorLeaseDuration var cmdFilters CommandFilters cmdFilters.AppendFilter(checkEndTransactionTrigger, true)