diff --git a/pkg/base/config.go b/pkg/base/config.go index a4935086d792..349b543d9e32 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -79,12 +79,16 @@ const ( // the jitter fraction. Jittering is done to prevent multiple leases // from being renewed simultaneously if they were all acquired // simultaneously. - DefaultTableDescriptorLeaseDuration = 5 * time.Minute + DefaultTableDescriptorLeaseDuration = 2 * time.Minute // DefaultTableDescriptorLeaseJitterFraction is the default factor // that we use to randomly jitter the lease duration when acquiring a // new lease and the lease renewal timeout. DefaultTableDescriptorLeaseJitterFraction = 0.25 + + // DefaultTableDescriptorLeaseRenewalTimeout is the default time + // before a lease expires when acquisition to renew the lease begins. + DefaultTableDescriptorLeaseRenewalTimeout = time.Minute ) var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt( @@ -489,6 +493,10 @@ type LeaseManagerConfig struct { // randomly jitter the lease duration when acquiring a new lease and // the lease renewal timeout. TableDescriptorLeaseJitterFraction float64 + + // DefaultTableDescriptorLeaseRenewalTimeout is the default time + // before a lease expires when acquisition to renew the lease begins. + TableDescriptorLeaseRenewalTimeout time.Duration } // NewLeaseManagerConfig initializes a LeaseManagerConfig with default values. @@ -496,5 +504,6 @@ func NewLeaseManagerConfig() *LeaseManagerConfig { return &LeaseManagerConfig{ TableDescriptorLeaseDuration: DefaultTableDescriptorLeaseDuration, TableDescriptorLeaseJitterFraction: DefaultTableDescriptorLeaseJitterFraction, + TableDescriptorLeaseRenewalTimeout: DefaultTableDescriptorLeaseRenewalTimeout, } } diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 102789f35161..77f46ddb9616 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -22,6 +22,7 @@ import ( "sync/atomic" "time" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "golang.org/x/net/context" @@ -40,11 +41,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" ) -// TODO(pmattis): Periodically renew leases for tables that were used recently and -// for which the lease will expire soon. - // tableVersionState holds the state for a table version. This includes // the lease information for a table version. // TODO(vivek): A node only needs to manage lease information on what it @@ -122,6 +121,9 @@ type LeaseStore struct { // range of the actual lease duration will be // [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration] leaseJitterFraction float64 + // leaseRenewalTimeout is the time before a lease expires when + // acquisition to renew the lease begins. + leaseRenewalTimeout time.Duration testingKnobs LeaseStoreTestingKnobs memMetrics *MemoryMetrics @@ -564,6 +566,11 @@ type tableState struct { tableNameCache *tableNameCache stopper *stop.Stopper + // renewalInProgress is an atomic indicator for when a renewal for a + // lease has begun. This is atomic to prevent multiple routines from + // entering renewal initialization. + renewalInProgress int32 + mu struct { syncutil.Mutex @@ -614,6 +621,7 @@ func (t *tableState) acquire( return nil, result.Err } } + return t.findForTimestamp(ctx, timestamp, m) } @@ -970,6 +978,38 @@ func (t *tableState) purgeOldVersions( return err } +// renewalFn returns a closure used for renewing a lease. +// t.renewalInProgress must be set to 1 before calling. +func (t *tableState) renewalFn( + m *LeaseManager, tableVersion *tableVersionState, +) func(context.Context) { + return func(ctx context.Context) { + resultChan, _ := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + var span opentracing.Span + ctx, span = tracing.ForkCtxSpan(ctx, "[async] lease renewal") + ctx = t.stopper.WithCancel(ctx) + defer tracing.FinishSpan(span) + log.VEventf(ctx, 1, + "lease acquisition beginning for tableID=%d tableName=%q", + t.id, tableVersion.TableDescriptor.Name) + token, err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + if err != nil { + log.Errorf(ctx, + "lease acquisition for tableID=%d tableName=%q failed: %s", + t.id, tableVersion.TableDescriptor.Name, err) + } else { + log.VEventf(ctx, 1, + "lease acquisition finished for tableID=%d tableName=%q", + t.id, tableVersion.TableDescriptor.Name) + } + return token, err + + }) + <-resultChan + atomic.StoreInt32(&t.renewalInProgress, 0) + } +} + // LeaseAcquireBlockType is the type of blocking result event when // calling LeaseAcquireResultBlockEvent. type LeaseAcquireBlockType int @@ -1166,6 +1206,7 @@ func NewLeaseManager( nodeID: nodeID, leaseDuration: cfg.TableDescriptorLeaseDuration, leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction, + leaseRenewalTimeout: cfg.TableDescriptorLeaseRenewalTimeout, testingKnobs: testingKnobs.LeaseStoreTestingKnobs, memMetrics: memMetrics, }, @@ -1192,7 +1233,9 @@ func nameMatchesTable(table *sqlbase.TableDescriptor, dbID sqlbase.ID, tableName // the timestamp. It returns the table descriptor and a expiration time. // A transaction using this descriptor must ensure that its // commit-timestamp < expiration-time. Care must be taken to not modify -// the returned descriptor. +// the returned descriptor. Renewal of a lease may begin in the +// background. Renewal is done in order to prevent blocking on future +// acquisitions. func (m *LeaseManager) AcquireByName( ctx context.Context, timestamp hlc.Timestamp, dbID sqlbase.ID, tableName string, ) (*sqlbase.TableDescriptor, hlc.Timestamp, error) { @@ -1200,6 +1243,17 @@ func (m *LeaseManager) AcquireByName( tableVersion := m.tableNames.get(dbID, tableName, timestamp) if tableVersion != nil { if !timestamp.Less(tableVersion.ModificationTime) { + + // Atomically check and begin a renewal if one has not already + // been set. + durationUntilExpiration := time.Duration(tableVersion.expiration.WallTime - hlc.UnixNano()) + durationUntilRenewal := durationUntilExpiration - m.LeaseStore.leaseRenewalTimeout + if t := m.findTableState(tableVersion.ID, false /* create */); t != nil && + durationUntilRenewal < 0 && atomic.CompareAndSwapInt32(&t.renewalInProgress, 0, 1) { + // Start the renewal. When it finishes, it will reset t.renewalInProgress. + t.stopper.RunWorker(context.Background(), t.renewalFn(m, tableVersion)) + } + return &tableVersion.TableDescriptor, tableVersion.expiration, nil } if err := m.Release(&tableVersion.TableDescriptor); err != nil { diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 7268b242b05f..a287a94d7638 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -185,6 +185,9 @@ func (t *leaseTest) mustPublish(ctx context.Context, nodeID uint32, descID sqlba } } +// node gets a LeaseManager corresponding to a mock node. A new lease +// manager is initialized for each node. This allows for more complex +// inter-node lease testing. func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager { mgr := t.nodes[nodeID] if mgr == nil { @@ -928,7 +931,12 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, _, err := leaseManager.AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName) + _, _, err := leaseManager.AcquireByName( + context.TODO(), + t.server.Clock().Now(), + dbID, + tableName, + ) if err != nil { b.Fatal(err) } @@ -936,3 +944,96 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); }) } + +// This test makes sure leases get renewed automatically in the +// background if the lease is about to expire, without blocking. We +// first acquire a lease, then continue to re-acquire it until another +// lease is renewed. +func TestLeaseRenewedAutomatically(testingT *testing.T) { + defer leaktest.AfterTest(testingT)() + + var testAcquiredCount int32 + var testAcquisitionBlockCount int32 + + params, _ := createTestServerParams() + params.Knobs = base.TestingKnobs{ + SQLLeaseManager: &sql.LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{ + // We want to track when leases get acquired and when they are renewed. + // We also want to know when acquiring blocks to test lease renewal. + LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) { + + atomic.AddInt32(&testAcquiredCount, 1) + }, + LeaseAcquireResultBlockEvent: func(_ sql.LeaseAcquireBlockType) { + atomic.AddInt32(&testAcquisitionBlockCount, 1) + }, + }, + }, + } + params.LeaseManagerConfig = base.NewLeaseManagerConfig() + // The lease jitter is set to ensure newer leases have higher + // expiration timestamps. + params.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = 0.0 + // The renewal timeout is set to be the duration, so background + // renewal should begin immediately after accessing a lease. + params.LeaseManagerConfig.TableDescriptorLeaseRenewalTimeout = + params.LeaseManagerConfig.TableDescriptorLeaseDuration + + t := newLeaseTest(testingT, params) + defer t.cleanup() + + if _, err := t.db.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); +`); err != nil { + t.Fatal(err) + } + + tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test") + dbID := tableDesc.ParentID + tableName := tableDesc.Name + + // Acquire the first lease. + ts, e1, err := t.node(1).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName) + if err != nil { + t.Fatal(err) + } else if err := t.release(1, ts); err != nil { + t.Fatal(err) + } else if count := atomic.LoadInt32(&testAcquiredCount); count != 1 { + t.Fatalf("expected 1 lease to be acquired, but acquired %d times", + count) + } + + // Reset testAcquisitionBlockCount as the first acqusition will always block. + atomic.StoreInt32(&testAcquisitionBlockCount, 0) + + testutils.SucceedsSoon(t, func() error { + // Acquire another lease. At first this will be the same lease, but + // eventually we will asynchronously renew a lease and our acquire will get + // a newer lease. + ts, e2, err := t.node(1).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := t.release(1, ts); err != nil { + t.Fatal(err) + } + }() + + // We check for the new expiry time because if our past acquire triggered + // the background renewal, the next lease we get will be the result of the + // background renewal. + if e2.WallTime <= e1.WallTime { + return errors.Errorf("expected new lease expiration (%s) to be after old lease expiration (%s)", + e2, e1) + } else if count := atomic.LoadInt32(&testAcquiredCount); count < 2 { + return errors.Errorf("expected at least 2 leases to be acquired, but acquired %d times", + count) + } else if blockCount := atomic.LoadInt32(&testAcquisitionBlockCount); blockCount > 0 { + t.Fatalf("expected repeated lease acquisition to not block, but blockCount is: %d", blockCount) + } + return nil + }) +}