Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: Add background renewal for table descriptor leases #19005

Merged
merged 2 commits into from
Oct 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ const (
// that we use to randomly jitter the lease duration when acquiring a
// new lease and the lease renewal timeout.
DefaultTableDescriptorLeaseJitterFraction = 0.25

// DefaultTableDescriptorLeaseRenewalTimeout is the default time
// before a lease expires when acquisition to renew the lease begins.
DefaultTableDescriptorLeaseRenewalTimeout = time.Minute
)

var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
Expand Down Expand Up @@ -489,12 +493,17 @@ type LeaseManagerConfig struct {
// randomly jitter the lease duration when acquiring a new lease and
// the lease renewal timeout.
TableDescriptorLeaseJitterFraction float64

// DefaultTableDescriptorLeaseRenewalTimeout is the default time
// before a lease expires when acquisition to renew the lease begins.
TableDescriptorLeaseRenewalTimeout time.Duration
}

// NewLeaseManagerConfig initializes a LeaseManagerConfig with default values.
func NewLeaseManagerConfig() *LeaseManagerConfig {
return &LeaseManagerConfig{
TableDescriptorLeaseDuration: DefaultTableDescriptorLeaseDuration,
TableDescriptorLeaseJitterFraction: DefaultTableDescriptorLeaseJitterFraction,
TableDescriptorLeaseRenewalTimeout: DefaultTableDescriptorLeaseRenewalTimeout,
}
}
12 changes: 10 additions & 2 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,16 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
if leaseManagerTestingKnobs := cfg.TestingKnobs.SQLLeaseManager; leaseManagerTestingKnobs != nil {
lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs)
}
s.leaseMgr = sql.NewLeaseManager(&s.nodeIDContainer, *s.db, s.clock, lmKnobs,
s.stopper, &s.internalMemMetrics, s.cfg.LeaseManagerConfig)
s.leaseMgr = sql.NewLeaseManager(
s.cfg.AmbientCtx,
&s.nodeIDContainer,
*s.db,
s.clock,
lmKnobs,
s.stopper,
&s.internalMemMetrics,
s.cfg.LeaseManagerConfig,
)
s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)

// We do not set memory monitors or a noteworthy limit because the children of
Expand Down
69 changes: 64 additions & 5 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// TODO(pmattis): Periodically renew leases for tables that were used recently and
// for which the lease will expire soon.

// tableVersionState holds the state for a table version. This includes
// the lease information for a table version.
// TODO(vivek): A node only needs to manage lease information on what it
Expand Down Expand Up @@ -122,6 +120,9 @@ type LeaseStore struct {
// range of the actual lease duration will be
// [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration]
leaseJitterFraction float64
// leaseRenewalTimeout is the time before a lease expires when
// acquisition to renew the lease begins.
leaseRenewalTimeout time.Duration

testingKnobs LeaseStoreTestingKnobs
memMetrics *MemoryMetrics
Expand Down Expand Up @@ -564,6 +565,11 @@ type tableState struct {
tableNameCache *tableNameCache
stopper *stop.Stopper

// renewalInProgress is an atomic indicator for when a renewal for a
// lease has begun. This is atomic to prevent multiple routines from
// entering renewal initialization.
renewalInProgress int32

mu struct {
syncutil.Mutex

Expand Down Expand Up @@ -614,6 +620,7 @@ func (t *tableState) acquire(
return nil, result.Err
}
}

return t.findForTimestamp(ctx, timestamp, m)
}

Expand Down Expand Up @@ -970,6 +977,33 @@ func (t *tableState) purgeOldVersions(
return err
}

// startLeaseRenewal starts a singleflight.Group to acquire a lease.
// This function blocks until lease acquisition completes.
// t.renewalInProgress must be set to 1 before calling.
func (t *tableState) startLeaseRenewal(
ctx context.Context, m *LeaseManager, tableVersion *tableVersionState,
) {
resultChan, _ := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) {
log.VEventf(ctx, 1,
"background lease renewal beginning for tableID=%d tableName=%q",
t.id, tableVersion.TableDescriptor.Name)
token, err := t.acquireNodeLease(ctx, m, hlc.Timestamp{})
if err != nil {
log.Errorf(ctx,
"background lease renewal for tableID=%d tableName=%q failed: %s",
t.id, tableVersion.TableDescriptor.Name, err)
} else {
log.VEventf(ctx, 1,
"background lease renewal finished for tableID=%d tableName=%q",
t.id, tableVersion.TableDescriptor.Name)
}
return token, err

})
<-resultChan
atomic.StoreInt32(&t.renewalInProgress, 0)
}

// LeaseAcquireBlockType is the type of blocking result event when
// calling LeaseAcquireResultBlockEvent.
type LeaseAcquireBlockType int
Expand Down Expand Up @@ -1144,13 +1178,15 @@ type LeaseManager struct {
// Not protected by mu.
tableNames tableNameCache
testingKnobs LeaseManagerTestingKnobs
ambientCtx log.AmbientContext
stopper *stop.Stopper
}

// NewLeaseManager creates a new LeaseManager.
//
// stopper is used to run async tasks. Can be nil in tests.
func NewLeaseManager(
ambientCtx log.AmbientContext,
nodeID *base.NodeIDContainer,
db client.DB,
clock *hlc.Clock,
Expand All @@ -1166,14 +1202,16 @@ func NewLeaseManager(
nodeID: nodeID,
leaseDuration: cfg.TableDescriptorLeaseDuration,
leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction,
leaseRenewalTimeout: cfg.TableDescriptorLeaseRenewalTimeout,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
},
testingKnobs: testingKnobs,
tableNames: tableNameCache{
tables: make(map[tableNameCacheKey]*tableVersionState),
},
stopper: stopper,
ambientCtx: ambientCtx,
stopper: stopper,
}

lm.mu.Lock()
Expand All @@ -1192,14 +1230,35 @@ func nameMatchesTable(table *sqlbase.TableDescriptor, dbID sqlbase.ID, tableName
// the timestamp. It returns the table descriptor and a expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
// the returned descriptor. Renewal of a lease may begin in the
// background. Renewal is done in order to prevent blocking on future
// acquisitions.
func (m *LeaseManager) AcquireByName(
ctx context.Context, timestamp hlc.Timestamp, dbID sqlbase.ID, tableName string,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
// Check if we have cached an ID for this name.
tableVersion := m.tableNames.get(dbID, tableName, timestamp)
if tableVersion != nil {
if !timestamp.Less(tableVersion.ModificationTime) {

// Atomically check and begin a renewal if one has not already
// been set.

durationUntilExpiry := time.Duration(tableVersion.expiration.WallTime - timestamp.WallTime)
if durationUntilExpiry < m.LeaseStore.leaseRenewalTimeout {
if t := m.findTableState(tableVersion.ID, false /* create */); t != nil &&
atomic.CompareAndSwapInt32(&t.renewalInProgress, 0, 1) {
// Start the renewal. When it finishes, it will reset t.renewalInProgress.
if err := t.stopper.RunAsyncTask(context.Background(),
"lease renewal", func(ctx context.Context) {
ctx, _ = tracing.EnsureContext(ctx, m.ambientCtx.Tracer, "lease renewal")
t.startLeaseRenewal(ctx, m, tableVersion)
}); err != nil {
return &tableVersion.TableDescriptor, tableVersion.expiration, err
}
}
}

return &tableVersion.TableDescriptor, tableVersion.expiration, nil
}
if err := m.Release(&tableVersion.TableDescriptor); err != nil {
Expand Down
154 changes: 150 additions & 4 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

type leaseTest struct {
*testing.T
testing.TB
server serverutils.TestServerInterface
db *gosql.DB
kvDB *client.DB
Expand All @@ -53,13 +55,13 @@ type leaseTest struct {
cfg *base.LeaseManagerConfig
}

func newLeaseTest(t *testing.T, params base.TestServerArgs) *leaseTest {
func newLeaseTest(tb testing.TB, params base.TestServerArgs) *leaseTest {
if params.LeaseManagerConfig == nil {
params.LeaseManagerConfig = base.NewLeaseManagerConfig()
}
s, db, kvDB := serverutils.StartServer(t, params)
s, db, kvDB := serverutils.StartServer(tb, params)
leaseTest := &leaseTest{
T: t,
TB: tb,
server: s,
db: db,
kvDB: kvDB,
Expand Down Expand Up @@ -185,12 +187,16 @@ func (t *leaseTest) mustPublish(ctx context.Context, nodeID uint32, descID sqlba
}
}

// node gets a LeaseManager corresponding to a mock node. A new lease
// manager is initialized for each node. This allows for more complex
// inter-node lease testing.
func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager {
mgr := t.nodes[nodeID]
if mgr == nil {
nc := &base.NodeIDContainer{}
nc.Set(context.TODO(), roachpb.NodeID(nodeID))
mgr = sql.NewLeaseManager(
log.AmbientContext{Tracer: tracing.NewTracer()},
nc, *t.kvDB,
t.server.Clock(),
t.leaseManagerTestingKnobs,
Expand Down Expand Up @@ -894,3 +900,143 @@ INSERT INTO t.timestamp VALUES ('a', 'b');
t.Fatal(err)
}
}

// BenchmarkLeaseAcquireByNameCached benchmarks the AcquireByName
// acquisition code path if a valid lease exists and is contained in
// tableNameCache. In particular this benchmark is done with
// parallelism, which is important to also benchmark locking.
func BenchmarkLeaseAcquireByNameCached(b *testing.B) {
defer leaktest.AfterTest(b)()
params, _ := createTestServerParams()

t := newLeaseTest(b, params)
defer t.cleanup()

if _, err := t.db.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test")
dbID := tableDesc.ParentID
tableName := tableDesc.Name
leaseManager := t.node(1)

// Acquire the lease so it is put into the tableNameCache.
_, _, err := leaseManager.AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName)
if err != nil {
t.Fatal(err)
}

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, _, err := leaseManager.AcquireByName(
context.TODO(),
t.server.Clock().Now(),
dbID,
tableName,
)
if err != nil {
b.Fatal(err)
}
}
})

}

// This test makes sure leases get renewed automatically in the
// background if the lease is about to expire, without blocking. We
// first acquire a lease, then continue to re-acquire it until another
// lease is renewed.
func TestLeaseRenewedAutomatically(testingT *testing.T) {
defer leaktest.AfterTest(testingT)()

var testAcquiredCount int32
var testAcquisitionBlockCount int32

params, _ := createTestServerParams()
params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
// We want to track when leases get acquired and when they are renewed.
// We also want to know when acquiring blocks to test lease renewal.
LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) {

atomic.AddInt32(&testAcquiredCount, 1)
},
LeaseAcquireResultBlockEvent: func(_ sql.LeaseAcquireBlockType) {
atomic.AddInt32(&testAcquisitionBlockCount, 1)
},
},
},
}
params.LeaseManagerConfig = base.NewLeaseManagerConfig()
// The lease jitter is set to ensure newer leases have higher
// expiration timestamps.
params.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = 0.0
// The renewal timeout is set to be the duration, so background
// renewal should begin immediately after accessing a lease.
params.LeaseManagerConfig.TableDescriptorLeaseRenewalTimeout =
params.LeaseManagerConfig.TableDescriptorLeaseDuration

t := newLeaseTest(testingT, params)
defer t.cleanup()

if _, err := t.db.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test")
dbID := tableDesc.ParentID
tableName := tableDesc.Name

// Acquire the first lease.
ts, e1, err := t.node(1).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName)
if err != nil {
t.Fatal(err)
} else if err := t.release(1, ts); err != nil {
t.Fatal(err)
} else if count := atomic.LoadInt32(&testAcquiredCount); count != 1 {
t.Fatalf("expected 1 lease to be acquired, but acquired %d times",
count)
}

// Reset testAcquisitionBlockCount as the first acqusition will always block.
atomic.StoreInt32(&testAcquisitionBlockCount, 0)

testutils.SucceedsSoon(t, func() error {
// Acquire another lease. At first this will be the same lease, but
// eventually we will asynchronously renew a lease and our acquire will get
// a newer lease.
ts, e2, err := t.node(1).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := t.release(1, ts); err != nil {
t.Fatal(err)
}
}()

// We check for the new expiry time because if our past acquire triggered
// the background renewal, the next lease we get will be the result of the
// background renewal.
if e2.WallTime <= e1.WallTime {
return errors.Errorf("expected new lease expiration (%s) to be after old lease expiration (%s)",
e2, e1)
} else if count := atomic.LoadInt32(&testAcquiredCount); count < 2 {
return errors.Errorf("expected at least 2 leases to be acquired, but acquired %d times",
count)
} else if blockCount := atomic.LoadInt32(&testAcquisitionBlockCount); blockCount > 0 {
t.Fatalf("expected repeated lease acquisition to not block, but blockCount is: %d", blockCount)
}
return nil
})
}
Loading