From 471fc16d20675ec5229084b1559228ccab46a25a Mon Sep 17 00:00:00 2001 From: Joey Pereira Date: Tue, 17 Oct 2017 23:11:43 -0400 Subject: [PATCH 1/2] sql: Add benchmark for LeaseManager.AcquireByName with cached leases. LeaseManager.AcquireByName is a critical path for statements and generally when accessing the table descriptor. The case when the lease is cached serves the majority of operations. --- pkg/sql/lease_test.go | 50 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index eae0f2ed350b..7268b242b05f 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -44,7 +44,7 @@ import ( ) type leaseTest struct { - *testing.T + testing.TB server serverutils.TestServerInterface db *gosql.DB kvDB *client.DB @@ -53,13 +53,13 @@ type leaseTest struct { cfg *base.LeaseManagerConfig } -func newLeaseTest(t *testing.T, params base.TestServerArgs) *leaseTest { +func newLeaseTest(tb testing.TB, params base.TestServerArgs) *leaseTest { if params.LeaseManagerConfig == nil { params.LeaseManagerConfig = base.NewLeaseManagerConfig() } - s, db, kvDB := serverutils.StartServer(t, params) + s, db, kvDB := serverutils.StartServer(tb, params) leaseTest := &leaseTest{ - T: t, + TB: tb, server: s, db: db, kvDB: kvDB, @@ -894,3 +894,45 @@ INSERT INTO t.timestamp VALUES ('a', 'b'); t.Fatal(err) } } + +// BenchmarkLeaseAcquireByNameCached benchmarks the AcquireByName +// acquisition code path if a valid lease exists and is contained in +// tableNameCache. In particular this benchmark is done with +// parallelism, which is important to also benchmark locking. +func BenchmarkLeaseAcquireByNameCached(b *testing.B) { + defer leaktest.AfterTest(b)() + params, _ := createTestServerParams() + + t := newLeaseTest(b, params) + defer t.cleanup() + + if _, err := t.db.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); +`); err != nil { + t.Fatal(err) + } + + tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test") + dbID := tableDesc.ParentID + tableName := tableDesc.Name + leaseManager := t.node(1) + + // Acquire the lease so it is put into the tableNameCache. + _, _, err := leaseManager.AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName) + if err != nil { + t.Fatal(err) + } + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, _, err := leaseManager.AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName) + if err != nil { + b.Fatal(err) + } + } + }) + +} From d7b794219aac6483148b7ff534a61648e5a54810 Mon Sep 17 00:00:00 2001 From: Joey Pereira Date: Tue, 17 Oct 2017 22:23:07 -0400 Subject: [PATCH 2/2] sql: Add background renewal for table descriptor leases This commit changes TableDescriptor lease acquisition to acquire a new lease if the current lease is about to expire. This prevents routines from blocking when there are no valid leases on a frequently accessed table. If renewal continously fails, the error will become user-facing if no lease is acquired yet by the time a lease is expired. Closes #17227. --- pkg/base/config.go | 9 +++ pkg/server/server.go | 12 +++- pkg/sql/lease.go | 69 +++++++++++++++++++-- pkg/sql/lease_test.go | 106 ++++++++++++++++++++++++++++++++- pkg/sql/schema_changer_test.go | 3 + 5 files changed, 191 insertions(+), 8 deletions(-) diff --git a/pkg/base/config.go b/pkg/base/config.go index a4935086d792..752bd4f1ae53 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -85,6 +85,10 @@ const ( // that we use to randomly jitter the lease duration when acquiring a // new lease and the lease renewal timeout. DefaultTableDescriptorLeaseJitterFraction = 0.25 + + // DefaultTableDescriptorLeaseRenewalTimeout is the default time + // before a lease expires when acquisition to renew the lease begins. + DefaultTableDescriptorLeaseRenewalTimeout = time.Minute ) var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt( @@ -489,6 +493,10 @@ type LeaseManagerConfig struct { // randomly jitter the lease duration when acquiring a new lease and // the lease renewal timeout. TableDescriptorLeaseJitterFraction float64 + + // DefaultTableDescriptorLeaseRenewalTimeout is the default time + // before a lease expires when acquisition to renew the lease begins. + TableDescriptorLeaseRenewalTimeout time.Duration } // NewLeaseManagerConfig initializes a LeaseManagerConfig with default values. @@ -496,5 +504,6 @@ func NewLeaseManagerConfig() *LeaseManagerConfig { return &LeaseManagerConfig{ TableDescriptorLeaseDuration: DefaultTableDescriptorLeaseDuration, TableDescriptorLeaseJitterFraction: DefaultTableDescriptorLeaseJitterFraction, + TableDescriptorLeaseRenewalTimeout: DefaultTableDescriptorLeaseRenewalTimeout, } } diff --git a/pkg/server/server.go b/pkg/server/server.go index c9005fbcd67d..99c65f232c89 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -276,8 +276,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil { lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs) } - s.leaseMgr = sql.NewLeaseManager(&s.nodeIDContainer, *s.db, s.clock, lmKnobs, - s.stopper, &s.internalMemMetrics, s.cfg.LeaseManagerConfig) + s.leaseMgr = sql.NewLeaseManager( + s.cfg.AmbientCtx, + &s.nodeIDContainer, + *s.db, + s.clock, + lmKnobs, + s.stopper, + &s.internalMemMetrics, + s.cfg.LeaseManagerConfig, + ) s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip) // We do not set memory monitors or a noteworthy limit because the children of diff --git a/pkg/sql/lease.go b/pkg/sql/lease.go index 102789f35161..829308bce1e6 100644 --- a/pkg/sql/lease.go +++ b/pkg/sql/lease.go @@ -40,11 +40,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" ) -// TODO(pmattis): Periodically renew leases for tables that were used recently and -// for which the lease will expire soon. - // tableVersionState holds the state for a table version. This includes // the lease information for a table version. // TODO(vivek): A node only needs to manage lease information on what it @@ -122,6 +120,9 @@ type LeaseStore struct { // range of the actual lease duration will be // [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration] leaseJitterFraction float64 + // leaseRenewalTimeout is the time before a lease expires when + // acquisition to renew the lease begins. + leaseRenewalTimeout time.Duration testingKnobs LeaseStoreTestingKnobs memMetrics *MemoryMetrics @@ -564,6 +565,11 @@ type tableState struct { tableNameCache *tableNameCache stopper *stop.Stopper + // renewalInProgress is an atomic indicator for when a renewal for a + // lease has begun. This is atomic to prevent multiple routines from + // entering renewal initialization. + renewalInProgress int32 + mu struct { syncutil.Mutex @@ -614,6 +620,7 @@ func (t *tableState) acquire( return nil, result.Err } } + return t.findForTimestamp(ctx, timestamp, m) } @@ -970,6 +977,33 @@ func (t *tableState) purgeOldVersions( return err } +// startLeaseRenewal starts a singleflight.Group to acquire a lease. +// This function blocks until lease acquisition completes. +// t.renewalInProgress must be set to 1 before calling. +func (t *tableState) startLeaseRenewal( + ctx context.Context, m *LeaseManager, tableVersion *tableVersionState, +) { + resultChan, _ := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) { + log.VEventf(ctx, 1, + "background lease renewal beginning for tableID=%d tableName=%q", + t.id, tableVersion.TableDescriptor.Name) + token, err := t.acquireNodeLease(ctx, m, hlc.Timestamp{}) + if err != nil { + log.Errorf(ctx, + "background lease renewal for tableID=%d tableName=%q failed: %s", + t.id, tableVersion.TableDescriptor.Name, err) + } else { + log.VEventf(ctx, 1, + "background lease renewal finished for tableID=%d tableName=%q", + t.id, tableVersion.TableDescriptor.Name) + } + return token, err + + }) + <-resultChan + atomic.StoreInt32(&t.renewalInProgress, 0) +} + // LeaseAcquireBlockType is the type of blocking result event when // calling LeaseAcquireResultBlockEvent. type LeaseAcquireBlockType int @@ -1144,6 +1178,7 @@ type LeaseManager struct { // Not protected by mu. tableNames tableNameCache testingKnobs LeaseManagerTestingKnobs + ambientCtx log.AmbientContext stopper *stop.Stopper } @@ -1151,6 +1186,7 @@ type LeaseManager struct { // // stopper is used to run async tasks. Can be nil in tests. func NewLeaseManager( + ambientCtx log.AmbientContext, nodeID *base.NodeIDContainer, db client.DB, clock *hlc.Clock, @@ -1166,6 +1202,7 @@ func NewLeaseManager( nodeID: nodeID, leaseDuration: cfg.TableDescriptorLeaseDuration, leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction, + leaseRenewalTimeout: cfg.TableDescriptorLeaseRenewalTimeout, testingKnobs: testingKnobs.LeaseStoreTestingKnobs, memMetrics: memMetrics, }, @@ -1173,7 +1210,8 @@ func NewLeaseManager( tableNames: tableNameCache{ tables: make(map[tableNameCacheKey]*tableVersionState), }, - stopper: stopper, + ambientCtx: ambientCtx, + stopper: stopper, } lm.mu.Lock() @@ -1192,7 +1230,9 @@ func nameMatchesTable(table *sqlbase.TableDescriptor, dbID sqlbase.ID, tableName // the timestamp. It returns the table descriptor and a expiration time. // A transaction using this descriptor must ensure that its // commit-timestamp < expiration-time. Care must be taken to not modify -// the returned descriptor. +// the returned descriptor. Renewal of a lease may begin in the +// background. Renewal is done in order to prevent blocking on future +// acquisitions. func (m *LeaseManager) AcquireByName( ctx context.Context, timestamp hlc.Timestamp, dbID sqlbase.ID, tableName string, ) (*sqlbase.TableDescriptor, hlc.Timestamp, error) { @@ -1200,6 +1240,25 @@ func (m *LeaseManager) AcquireByName( tableVersion := m.tableNames.get(dbID, tableName, timestamp) if tableVersion != nil { if !timestamp.Less(tableVersion.ModificationTime) { + + // Atomically check and begin a renewal if one has not already + // been set. + + durationUntilExpiry := time.Duration(tableVersion.expiration.WallTime - timestamp.WallTime) + if durationUntilExpiry < m.LeaseStore.leaseRenewalTimeout { + if t := m.findTableState(tableVersion.ID, false /* create */); t != nil && + atomic.CompareAndSwapInt32(&t.renewalInProgress, 0, 1) { + // Start the renewal. When it finishes, it will reset t.renewalInProgress. + if err := t.stopper.RunAsyncTask(context.Background(), + "lease renewal", func(ctx context.Context) { + ctx, _ = tracing.EnsureContext(ctx, m.ambientCtx.Tracer, "lease renewal") + t.startLeaseRenewal(ctx, m, tableVersion) + }); err != nil { + return &tableVersion.TableDescriptor, tableVersion.expiration, err + } + } + } + return &tableVersion.TableDescriptor, tableVersion.expiration, nil } if err := m.Release(&tableVersion.TableDescriptor); err != nil { diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 7268b242b05f..b0a008827217 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -39,7 +39,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/pkg/errors" ) @@ -185,12 +187,16 @@ func (t *leaseTest) mustPublish(ctx context.Context, nodeID uint32, descID sqlba } } +// node gets a LeaseManager corresponding to a mock node. A new lease +// manager is initialized for each node. This allows for more complex +// inter-node lease testing. func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager { mgr := t.nodes[nodeID] if mgr == nil { nc := &base.NodeIDContainer{} nc.Set(context.TODO(), roachpb.NodeID(nodeID)) mgr = sql.NewLeaseManager( + log.AmbientContext{Tracer: tracing.NewTracer()}, nc, *t.kvDB, t.server.Clock(), t.leaseManagerTestingKnobs, @@ -928,7 +934,12 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, _, err := leaseManager.AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName) + _, _, err := leaseManager.AcquireByName( + context.TODO(), + t.server.Clock().Now(), + dbID, + tableName, + ) if err != nil { b.Fatal(err) } @@ -936,3 +947,96 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); }) } + +// This test makes sure leases get renewed automatically in the +// background if the lease is about to expire, without blocking. We +// first acquire a lease, then continue to re-acquire it until another +// lease is renewed. +func TestLeaseRenewedAutomatically(testingT *testing.T) { + defer leaktest.AfterTest(testingT)() + + var testAcquiredCount int32 + var testAcquisitionBlockCount int32 + + params, _ := createTestServerParams() + params.Knobs = base.TestingKnobs{ + SQLLeaseManager: &sql.LeaseManagerTestingKnobs{ + LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{ + // We want to track when leases get acquired and when they are renewed. + // We also want to know when acquiring blocks to test lease renewal. + LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) { + + atomic.AddInt32(&testAcquiredCount, 1) + }, + LeaseAcquireResultBlockEvent: func(_ sql.LeaseAcquireBlockType) { + atomic.AddInt32(&testAcquisitionBlockCount, 1) + }, + }, + }, + } + params.LeaseManagerConfig = base.NewLeaseManagerConfig() + // The lease jitter is set to ensure newer leases have higher + // expiration timestamps. + params.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = 0.0 + // The renewal timeout is set to be the duration, so background + // renewal should begin immediately after accessing a lease. + params.LeaseManagerConfig.TableDescriptorLeaseRenewalTimeout = + params.LeaseManagerConfig.TableDescriptorLeaseDuration + + t := newLeaseTest(testingT, params) + defer t.cleanup() + + if _, err := t.db.Exec(` +CREATE DATABASE t; +CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR); +`); err != nil { + t.Fatal(err) + } + + tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test") + dbID := tableDesc.ParentID + tableName := tableDesc.Name + + // Acquire the first lease. + ts, e1, err := t.node(1).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName) + if err != nil { + t.Fatal(err) + } else if err := t.release(1, ts); err != nil { + t.Fatal(err) + } else if count := atomic.LoadInt32(&testAcquiredCount); count != 1 { + t.Fatalf("expected 1 lease to be acquired, but acquired %d times", + count) + } + + // Reset testAcquisitionBlockCount as the first acqusition will always block. + atomic.StoreInt32(&testAcquisitionBlockCount, 0) + + testutils.SucceedsSoon(t, func() error { + // Acquire another lease. At first this will be the same lease, but + // eventually we will asynchronously renew a lease and our acquire will get + // a newer lease. + ts, e2, err := t.node(1).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := t.release(1, ts); err != nil { + t.Fatal(err) + } + }() + + // We check for the new expiry time because if our past acquire triggered + // the background renewal, the next lease we get will be the result of the + // background renewal. + if e2.WallTime <= e1.WallTime { + return errors.Errorf("expected new lease expiration (%s) to be after old lease expiration (%s)", + e2, e1) + } else if count := atomic.LoadInt32(&testAcquiredCount); count < 2 { + return errors.Errorf("expected at least 2 leases to be acquired, but acquired %d times", + count) + } else if blockCount := atomic.LoadInt32(&testAcquisitionBlockCount); blockCount > 0 { + t.Fatalf("expected repeated lease acquisition to not block, but blockCount is: %d", blockCount) + } + return nil + }) +} diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index e19d24dc27f9..0de2addee27c 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -40,11 +40,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" ) // asyncSchemaChangerDisabled can be used to disable asynchronous processing @@ -175,6 +177,7 @@ func TestSchemaChangeProcess(t *testing.T) { stopper := stop.NewStopper() cfg := base.NewLeaseManagerConfig() leaseMgr := sql.NewLeaseManager( + log.AmbientContext{Tracer: tracing.NewTracer()}, &base.NodeIDContainer{}, *kvDB, hlc.NewClock(hlc.UnixNano, time.Nanosecond),