Skip to content

Commit

Permalink
sql: Add background renewal for table descriptor leases
Browse files Browse the repository at this point in the history
This commit changes TableDescriptor lease acquisition to acquire a new
lease if the current lease is about to expire. This prevents routines
from blocking when there are no valid leases on a frequently accessed
table.

If renewal continously fails, the error will become user-facing if no
lease is acquired yet by the time a lease is expired.

Closes #17227.
  • Loading branch information
lgo committed Oct 23, 2017
1 parent 471fc16 commit 4652a1d
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 6 deletions.
11 changes: 10 additions & 1 deletion pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,16 @@ const (
// the jitter fraction. Jittering is done to prevent multiple leases
// from being renewed simultaneously if they were all acquired
// simultaneously.
DefaultTableDescriptorLeaseDuration = 5 * time.Minute
DefaultTableDescriptorLeaseDuration = 2 * time.Minute

// DefaultTableDescriptorLeaseJitterFraction is the default factor
// that we use to randomly jitter the lease duration when acquiring a
// new lease and the lease renewal timeout.
DefaultTableDescriptorLeaseJitterFraction = 0.25

// DefaultTableDescriptorLeaseRenewalTimeout is the default time
// before a lease expires when acquisition to renew the lease begins.
DefaultTableDescriptorLeaseRenewalTimeout = time.Minute
)

var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
Expand Down Expand Up @@ -489,12 +493,17 @@ type LeaseManagerConfig struct {
// randomly jitter the lease duration when acquiring a new lease and
// the lease renewal timeout.
TableDescriptorLeaseJitterFraction float64

// DefaultTableDescriptorLeaseRenewalTimeout is the default time
// before a lease expires when acquisition to renew the lease begins.
TableDescriptorLeaseRenewalTimeout time.Duration
}

// NewLeaseManagerConfig initializes a LeaseManagerConfig with default values.
func NewLeaseManagerConfig() *LeaseManagerConfig {
return &LeaseManagerConfig{
TableDescriptorLeaseDuration: DefaultTableDescriptorLeaseDuration,
TableDescriptorLeaseJitterFraction: DefaultTableDescriptorLeaseJitterFraction,
TableDescriptorLeaseRenewalTimeout: DefaultTableDescriptorLeaseRenewalTimeout,
}
}
52 changes: 48 additions & 4 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// TODO(pmattis): Periodically renew leases for tables that were used recently and
// for which the lease will expire soon.

// tableVersionState holds the state for a table version. This includes
// the lease information for a table version.
// TODO(vivek): A node only needs to manage lease information on what it
Expand Down Expand Up @@ -122,6 +120,9 @@ type LeaseStore struct {
// range of the actual lease duration will be
// [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration]
leaseJitterFraction float64
// leaseRenewalTimeout is the time before a lease expires when
// acquisition to renew the lease begins.
leaseRenewalTimeout time.Duration

testingKnobs LeaseStoreTestingKnobs
memMetrics *MemoryMetrics
Expand Down Expand Up @@ -564,6 +565,11 @@ type tableState struct {
tableNameCache *tableNameCache
stopper *stop.Stopper

// renewalInProgress is an atomic indicator for when a renewal for a
// lease has begun. This is atomic to prevent multiple routines from
// entering renewal initialization.
renewalInProgress int32

mu struct {
syncutil.Mutex

Expand Down Expand Up @@ -614,6 +620,7 @@ func (t *tableState) acquire(
return nil, result.Err
}
}

return t.findForTimestamp(ctx, timestamp, m)
}

Expand Down Expand Up @@ -1166,6 +1173,7 @@ func NewLeaseManager(
nodeID: nodeID,
leaseDuration: cfg.TableDescriptorLeaseDuration,
leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction,
leaseRenewalTimeout: cfg.TableDescriptorLeaseRenewalTimeout,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
},
Expand All @@ -1192,14 +1200,50 @@ func nameMatchesTable(table *sqlbase.TableDescriptor, dbID sqlbase.ID, tableName
// the timestamp. It returns the table descriptor and a expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
// the returned descriptor. Renewal of a lease may begin in the
// background. Renewal is done in order to prevent blocking on future
// acquisitions.
func (m *LeaseManager) AcquireByName(
ctx context.Context, timestamp hlc.Timestamp, dbID sqlbase.ID, tableName string,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
// Check if we have cached an ID for this name.
tableVersion := m.tableNames.get(dbID, tableName, timestamp)
if tableVersion != nil {
if !timestamp.Less(tableVersion.ModificationTime) {

// Atomically check and begin a renewal if one has not already
// been set.
durationUntilExpiration := time.Duration(tableVersion.expiration.WallTime - hlc.UnixNano())
durationUntilRenewal := durationUntilExpiration - m.LeaseStore.leaseRenewalTimeout
if t := m.findTableState(tableVersion.ID, false /* create */); t != nil &&
durationUntilRenewal < 0 && atomic.CompareAndSwapInt32(&t.renewalInProgress, 0, 1) {
t.stopper.RunWorker(context.Background(), func(ctx context.Context) {
resultChan, _ := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) {
innerCtx, span := tracing.ForkCtxSpan(ctx, "[async] lease refresh")
innerCtx = t.stopper.WithCancel(innerCtx)
defer tracing.FinishSpan(span)
if log.V(2) {
log.Infof(innerCtx,
"lease acquisition beginning for tableID=%d tableName=%q",
t.id, tableVersion.TableDescriptor.Name)
}
token, err := t.acquireNodeLease(innerCtx, m, hlc.Timestamp{})
if err != nil {
log.Errorf(innerCtx,
"lease acquisition for tableID=%d tableName=%q failed: %s",
t.id, tableVersion.TableDescriptor.Name, err)
} else if log.V(2) {
log.Infof(innerCtx,
"lease acquisition finished for tableID=%d tableName=%q",
t.id, tableVersion.TableDescriptor.Name)
}
return token, err
})
<-resultChan
atomic.StoreInt32(&t.renewalInProgress, 0)
})
}

return &tableVersion.TableDescriptor, tableVersion.expiration, nil
}
if err := m.Release(&tableVersion.TableDescriptor); err != nil {
Expand Down
193 changes: 192 additions & 1 deletion pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func (t *leaseTest) acquire(
return t.node(nodeID).Acquire(context.TODO(), t.server.Clock().Now(), descID)
}

func (t *leaseTest) acquireByName(
nodeID uint32, dbID sqlbase.ID, name string,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
return t.node(nodeID).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, name)
}

func (t *leaseTest) acquireMinVersion(
nodeID uint32, descID sqlbase.ID, minVersion sqlbase.DescriptorVersion,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
Expand Down Expand Up @@ -928,11 +934,196 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, _, err := leaseManager.AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName)
_, _, err := leaseManager.AcquireByName(
context.TODO(),
t.server.Clock().Now(),
dbID,
tableName,
)
if err != nil {
b.Fatal(err)
}
}
})

}

// This test makes sure the lease gets renewed automatically in the background
// if the lease is about to expire, without blocking.
func TestLeaseRefreshedAutomatically(testingT *testing.T) {
defer leaktest.AfterTest(testingT)()

var testAcquiredCount int32
var testAcquisitionBlockCount int32

params, _ := createTestServerParams()
params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
// We want to track when leases get acquired and when they are renewed.
// We also want to know when acquiring blocks to test lease renewal.
LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) {

atomic.AddInt32(&testAcquiredCount, 1)
},
LeaseAcquireResultBlockEvent: func(_ sql.LeaseAcquireBlockType) {
atomic.AddInt32(&testAcquisitionBlockCount, 1)
},
},
},
}
params.LeaseManagerConfig = base.NewLeaseManagerConfig()
// The lease jitter is set to ensure newer leases have higher
// expiration timestamps.
params.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = 0.0
// The renewal timeout is set to be the duration, so background
// renewal should begin immediately after accessing a lease.
params.LeaseManagerConfig.TableDescriptorLeaseRenewalTimeout =
params.LeaseManagerConfig.TableDescriptorLeaseDuration

t := newLeaseTest(testingT, params)
defer t.cleanup()

if _, err := t.db.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test")
dbID := tableDesc.ParentID
tableName := tableDesc.Name

// Acquire the first lease.
ts, e1, err := t.acquireByName(1, dbID, tableName)
if err != nil {
t.Fatal(err)
} else if err := t.release(1, ts); err != nil {
t.Fatal(err)
} else if count := atomic.LoadInt32(&testAcquiredCount); count != 1 {
t.Fatalf("expected 1 lease to be acquired, but acquired %d times",
count)
}

// Reset testAcquisitionBlockCount as the first acqusition will always block.
atomic.StoreInt32(&testAcquisitionBlockCount, 0)

testutils.SucceedsSoon(t, func() error {
// Acquire another lease. At first this will be the same lease, but
// eventually we will asynchronously renew a lease and our acquire will get
// a newer lease.
ts, e2, err := t.acquireByName(1, dbID, tableName)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := t.release(1, ts); err != nil {
t.Fatal(err)
}
}()

// We check for the new expiry time because if our past acquire triggered
// the background refresh, the next lease we get will be the result of the
// background refresh.
if e2.WallTime <= e1.WallTime {
return errors.Errorf("expected new lease expiration (%s) to be after old lease expiration (%s)",
e2, e1)
} else if count := atomic.LoadInt32(&testAcquiredCount); count < 2 {
return errors.Errorf("expected at least 2 leases to be acquired, but acquired %d times",
count)
} else if blockCount := atomic.LoadInt32(&testAcquisitionBlockCount); blockCount > 0 {
t.Fatalf("expected repeated lease acquisition to not block, but blockCount is: %d", blockCount)
}
return nil
})
}

// This test makes sure that async lease refreshing doesn't block any routines
func TestLeaseRefreshDoesntBlock(testingT *testing.T) {
defer leaktest.AfterTest(testingT)()
var testAcquiredCount int32
var testAcquisitionBlockCount int32
params, _ := createTestServerParams()
params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
// We want to track when leases get acquired and when they are renewed.
// We also want to know when acquiring blocks to test lease renewal.
LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) {
atomic.AddInt32(&testAcquiredCount, 1)
},
LeaseAcquireResultBlockEvent: func(_ sql.LeaseAcquireBlockType) {
atomic.AddInt32(&testAcquisitionBlockCount, 1)
},
},
},
}

params.LeaseManagerConfig = base.NewLeaseManagerConfig()
// We set lease duration to be longer than the expected test run time.
// This means acquiring a lease should not happen because no leases
// were available, but only by the background refreshing.
params.LeaseManagerConfig.TableDescriptorLeaseDuration = 100 * time.Millisecond
// The lease jitter is set to ensure newer leases have higher
// expiration timestamps.
params.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = 0.0
// The renewal timeout is set to be 20ms less then the duration, so a
// background renewal will occur at least once during the test.
params.LeaseManagerConfig.TableDescriptorLeaseRenewalTimeout = 100*time.Millisecond -
20*time.Millisecond

t := newLeaseTest(testingT, params)
defer t.cleanup()

if _, err := t.db.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test")
dbID := tableDesc.ParentID
tableName := tableDesc.Name

// Acquire the first lease.
ts, _, err := t.acquireByName(1, dbID, tableName)
if err != nil {
t.Fatal(err)
} else if err := t.release(1, ts); err != nil {
t.Fatal(err)
} else if count := atomic.LoadInt32(&testAcquiredCount); count != 1 {
t.Fatalf("expected 1 lease to be acquired, but acquired %d times",
count)
}

// Reset testAcquisitionBlockCount as the first acqusition will always block.
atomic.StoreInt32(&testAcquisitionBlockCount, 0)

var wg sync.WaitGroup
numRoutines := 40
wg.Add(numRoutines)
for i := 0; i < numRoutines; i++ {
time.Sleep(1 * time.Millisecond)
go func() {
defer wg.Done()
ts, _, err := t.acquireByName(1, dbID, tableName)
if err != nil {
t.Error(err)
return
} else if err := t.release(1, ts); err != nil {
t.Error(err)
return
}
}()
}
wg.Wait()

if acquiredCount := atomic.LoadInt32(&testAcquiredCount); acquiredCount < 2 {
t.Fatalf("expected to acquire at least 2 leases, but instead acquired: %d leases", acquiredCount)
}
if blockCount := atomic.LoadInt32(&testAcquisitionBlockCount); blockCount > 0 {
t.Fatalf("expected repeated lease acquisition to not block, but blockCount is: %d", blockCount)
}
}

0 comments on commit 4652a1d

Please sign in to comment.