Skip to content

Commit

Permalink
sql: Add background renewal for table descriptor leases
Browse files Browse the repository at this point in the history
This commit changes TableDescriptor lease acquisition to acquire a new
lease if the current lease is about to expire. This prevents routines
from blocking when there are no valid leases on a frequently accessed
table.

If renewal continously fails, the error will become user-facing if no
lease is acquired yet by the time a lease is expired.

Closes #17227.
  • Loading branch information
lgo committed Oct 24, 2017
1 parent 471fc16 commit c6a0511
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 6 deletions.
11 changes: 10 additions & 1 deletion pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,16 @@ const (
// the jitter fraction. Jittering is done to prevent multiple leases
// from being renewed simultaneously if they were all acquired
// simultaneously.
DefaultTableDescriptorLeaseDuration = 5 * time.Minute
DefaultTableDescriptorLeaseDuration = 2 * time.Minute

// DefaultTableDescriptorLeaseJitterFraction is the default factor
// that we use to randomly jitter the lease duration when acquiring a
// new lease and the lease renewal timeout.
DefaultTableDescriptorLeaseJitterFraction = 0.25

// DefaultTableDescriptorLeaseRenewalTimeout is the default time
// before a lease expires when acquisition to renew the lease begins.
DefaultTableDescriptorLeaseRenewalTimeout = time.Minute
)

var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
Expand Down Expand Up @@ -489,12 +493,17 @@ type LeaseManagerConfig struct {
// randomly jitter the lease duration when acquiring a new lease and
// the lease renewal timeout.
TableDescriptorLeaseJitterFraction float64

// DefaultTableDescriptorLeaseRenewalTimeout is the default time
// before a lease expires when acquisition to renew the lease begins.
TableDescriptorLeaseRenewalTimeout time.Duration
}

// NewLeaseManagerConfig initializes a LeaseManagerConfig with default values.
func NewLeaseManagerConfig() *LeaseManagerConfig {
return &LeaseManagerConfig{
TableDescriptorLeaseDuration: DefaultTableDescriptorLeaseDuration,
TableDescriptorLeaseJitterFraction: DefaultTableDescriptorLeaseJitterFraction,
TableDescriptorLeaseRenewalTimeout: DefaultTableDescriptorLeaseRenewalTimeout,
}
}
62 changes: 58 additions & 4 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"golang.org/x/net/context"

Expand All @@ -40,11 +41,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// TODO(pmattis): Periodically renew leases for tables that were used recently and
// for which the lease will expire soon.

// tableVersionState holds the state for a table version. This includes
// the lease information for a table version.
// TODO(vivek): A node only needs to manage lease information on what it
Expand Down Expand Up @@ -122,6 +121,9 @@ type LeaseStore struct {
// range of the actual lease duration will be
// [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration]
leaseJitterFraction float64
// leaseRenewalTimeout is the time before a lease expires when
// acquisition to renew the lease begins.
leaseRenewalTimeout time.Duration

testingKnobs LeaseStoreTestingKnobs
memMetrics *MemoryMetrics
Expand Down Expand Up @@ -564,6 +566,11 @@ type tableState struct {
tableNameCache *tableNameCache
stopper *stop.Stopper

// renewalInProgress is an atomic indicator for when a renewal for a
// lease has begun. This is atomic to prevent multiple routines from
// entering renewal initialization.
renewalInProgress int32

mu struct {
syncutil.Mutex

Expand Down Expand Up @@ -614,6 +621,7 @@ func (t *tableState) acquire(
return nil, result.Err
}
}

return t.findForTimestamp(ctx, timestamp, m)
}

Expand Down Expand Up @@ -970,6 +978,38 @@ func (t *tableState) purgeOldVersions(
return err
}

// renewalFn returns a closure used for renewing a lease.
// t.renewalInProgress must be set to 1 before calling.
func (t *tableState) renewalFn(
m *LeaseManager, tableVersion *tableVersionState,
) func(context.Context) {
return func(ctx context.Context) {
resultChan, _ := t.mu.group.DoChan(acquireGroupKey, func() (interface{}, error) {
var span opentracing.Span
ctx, span = tracing.ForkCtxSpan(ctx, "[async] lease renewal")
ctx = t.stopper.WithCancel(ctx)
defer tracing.FinishSpan(span)
log.VEventf(ctx, 1,
"lease acquisition beginning for tableID=%d tableName=%q",
t.id, tableVersion.TableDescriptor.Name)
token, err := t.acquireNodeLease(ctx, m, hlc.Timestamp{})
if err != nil {
log.Errorf(ctx,
"lease acquisition for tableID=%d tableName=%q failed: %s",
t.id, tableVersion.TableDescriptor.Name, err)
} else {
log.VEventf(ctx, 1,
"lease acquisition finished for tableID=%d tableName=%q",
t.id, tableVersion.TableDescriptor.Name)
}
return token, err

})
<-resultChan
atomic.StoreInt32(&t.renewalInProgress, 0)
}
}

// LeaseAcquireBlockType is the type of blocking result event when
// calling LeaseAcquireResultBlockEvent.
type LeaseAcquireBlockType int
Expand Down Expand Up @@ -1166,6 +1206,7 @@ func NewLeaseManager(
nodeID: nodeID,
leaseDuration: cfg.TableDescriptorLeaseDuration,
leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction,
leaseRenewalTimeout: cfg.TableDescriptorLeaseRenewalTimeout,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
},
Expand All @@ -1192,14 +1233,27 @@ func nameMatchesTable(table *sqlbase.TableDescriptor, dbID sqlbase.ID, tableName
// the timestamp. It returns the table descriptor and a expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
// the returned descriptor. Renewal of a lease may begin in the
// background. Renewal is done in order to prevent blocking on future
// acquisitions.
func (m *LeaseManager) AcquireByName(
ctx context.Context, timestamp hlc.Timestamp, dbID sqlbase.ID, tableName string,
) (*sqlbase.TableDescriptor, hlc.Timestamp, error) {
// Check if we have cached an ID for this name.
tableVersion := m.tableNames.get(dbID, tableName, timestamp)
if tableVersion != nil {
if !timestamp.Less(tableVersion.ModificationTime) {

// Atomically check and begin a renewal if one has not already
// been set.
durationUntilExpiration := time.Duration(tableVersion.expiration.WallTime - hlc.UnixNano())
durationUntilRenewal := durationUntilExpiration - m.LeaseStore.leaseRenewalTimeout
if t := m.findTableState(tableVersion.ID, false /* create */); t != nil &&
durationUntilRenewal < 0 && atomic.CompareAndSwapInt32(&t.renewalInProgress, 0, 1) {
// Start the renewal. When it finishes, it will reset t.renewalInProgress.
t.stopper.RunWorker(context.Background(), t.renewalFn(m, tableVersion))
}

return &tableVersion.TableDescriptor, tableVersion.expiration, nil
}
if err := m.Release(&tableVersion.TableDescriptor); err != nil {
Expand Down
103 changes: 102 additions & 1 deletion pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (t *leaseTest) mustPublish(ctx context.Context, nodeID uint32, descID sqlba
}
}

// node gets a LeaseManager corresponding to a mock node. A new lease
// manager is initialized for each node. This allows for more complex
// inter-node lease testing.
func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager {
mgr := t.nodes[nodeID]
if mgr == nil {
Expand Down Expand Up @@ -928,11 +931,109 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, _, err := leaseManager.AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName)
_, _, err := leaseManager.AcquireByName(
context.TODO(),
t.server.Clock().Now(),
dbID,
tableName,
)
if err != nil {
b.Fatal(err)
}
}
})

}

// This test makes sure leases get renewed automatically in the
// background if the lease is about to expire, without blocking. We
// first acquire a lease, then continue to re-acquire it until another
// lease is renewed.
func TestLeaseRenewedAutomatically(testingT *testing.T) {
defer leaktest.AfterTest(testingT)()

var testAcquiredCount int32
var testAcquisitionBlockCount int32

params, _ := createTestServerParams()
params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
LeaseStoreTestingKnobs: sql.LeaseStoreTestingKnobs{
// We want to track when leases get acquired and when they are renewed.
// We also want to know when acquiring blocks to test lease renewal.
LeaseAcquiredEvent: func(_ sqlbase.TableDescriptor, _ error) {

atomic.AddInt32(&testAcquiredCount, 1)
},
LeaseAcquireResultBlockEvent: func(_ sql.LeaseAcquireBlockType) {
atomic.AddInt32(&testAcquisitionBlockCount, 1)
},
},
},
}
params.LeaseManagerConfig = base.NewLeaseManagerConfig()
// The lease jitter is set to ensure newer leases have higher
// expiration timestamps.
params.LeaseManagerConfig.TableDescriptorLeaseJitterFraction = 0.0
// The renewal timeout is set to be the duration, so background
// renewal should begin immediately after accessing a lease.
params.LeaseManagerConfig.TableDescriptorLeaseRenewalTimeout =
params.LeaseManagerConfig.TableDescriptorLeaseDuration

t := newLeaseTest(testingT, params)
defer t.cleanup()

if _, err := t.db.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
`); err != nil {
t.Fatal(err)
}

tableDesc := sqlbase.GetTableDescriptor(t.kvDB, "t", "test")
dbID := tableDesc.ParentID
tableName := tableDesc.Name

// Acquire the first lease.
ts, e1, err := t.node(1).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName)
if err != nil {
t.Fatal(err)
} else if err := t.release(1, ts); err != nil {
t.Fatal(err)
} else if count := atomic.LoadInt32(&testAcquiredCount); count != 1 {
t.Fatalf("expected 1 lease to be acquired, but acquired %d times",
count)
}

// Reset testAcquisitionBlockCount as the first acqusition will always block.
atomic.StoreInt32(&testAcquisitionBlockCount, 0)

testutils.SucceedsSoon(t, func() error {
// Acquire another lease. At first this will be the same lease, but
// eventually we will asynchronously renew a lease and our acquire will get
// a newer lease.
ts, e2, err := t.node(1).AcquireByName(context.TODO(), t.server.Clock().Now(), dbID, tableName)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := t.release(1, ts); err != nil {
t.Fatal(err)
}
}()

// We check for the new expiry time because if our past acquire triggered
// the background renewal, the next lease we get will be the result of the
// background renewal.
if e2.WallTime <= e1.WallTime {
return errors.Errorf("expected new lease expiration (%s) to be after old lease expiration (%s)",
e2, e1)
} else if count := atomic.LoadInt32(&testAcquiredCount); count < 2 {
return errors.Errorf("expected at least 2 leases to be acquired, but acquired %d times",
count)
} else if blockCount := atomic.LoadInt32(&testAcquisitionBlockCount); blockCount > 0 {
t.Fatalf("expected repeated lease acquisition to not block, but blockCount is: %d", blockCount)
}
return nil
})
}

0 comments on commit c6a0511

Please sign in to comment.