Skip to content

Commit

Permalink
sql: Make LeaseDuration configurable and prevent test collisions
Browse files Browse the repository at this point in the history
Previously, the global LeaseDuration value was modified during a test.
If tests doing this are run in parallel, this would cause race problems.
The global has now been modified to live on the LeaseStore where it is
referenced from. It is also now configured during server initialization.

A LeaseJitterFraction constant is also refactored out. This is used in
later path.
  • Loading branch information
lgo committed Oct 16, 2017
1 parent 3062387 commit 43feb10
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 29 deletions.
37 changes: 37 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ const (
// with a value of 0.2 and a liveness duration of 10 seconds, each node's
// liveness record would be eagerly renewed after 2 seconds.
livenessRenewalFraction = 0.5

// DefaultTableDescriptorLeaseDuration is the default mean duration a
// lease will be acquired for. The actual duration is jittered using
// the jitter fraction. Jittering is done to prevent multiple leases
// from being renewed simultaneously if they were all acquired
// simultaneously.
DefaultTableDescriptorLeaseDuration = 5 * time.Minute

// DefaultTableDescriptorLeaseJitterFraction is the default factor
// that we use to randomly jitter the lease duration when acquiring a
// new lease and the lease renewal timeout.
DefaultTableDescriptorLeaseJitterFraction = 0.25
)

var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
Expand Down Expand Up @@ -466,3 +478,28 @@ func TempStorageConfigFromEnv(
Mon: &monitor,
}
}

// LeaseManagerConfig holds table lease manager parameters.
type LeaseManagerConfig struct {
// TableDescriptorLeaseDuration is the mean duration a lease will be
// acquired for.
TableDescriptorLeaseDuration time.Duration

// TableDescriptorLeaseJitterFraction is the factor that we use to
// randomly jitter the lease duration when acquiring a new lease and
// the lease renewal timeout.
// A pointer is used because the zero-value is a commonly cconfigured
// value.
TableDescriptorLeaseJitterFraction *float64
}

// SetDefaults initializes unset fields.
func (cfg *LeaseManagerConfig) SetDefaults() {
if cfg.TableDescriptorLeaseDuration == 0 {
cfg.TableDescriptorLeaseDuration = DefaultTableDescriptorLeaseDuration
}
if cfg.TableDescriptorLeaseJitterFraction == nil {
leaseJitterFraction := DefaultTableDescriptorLeaseJitterFraction
cfg.TableDescriptorLeaseJitterFraction = &leaseJitterFraction
}
}
1 change: 1 addition & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type TestServerArgs struct {

*cluster.Settings
RaftConfig
LeaseManagerConfig

// PartOfCluster must be set if the TestServer is joining others in a cluster.
// If not set (and hence the server is the only one in the cluster), the
Expand Down
1 change: 1 addition & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type Config struct {
Settings *cluster.Settings

base.RaftConfig
base.LeaseManagerConfig

// Unix socket: for postgres only.
SocketFile string
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs)
}
s.leaseMgr = sql.NewLeaseManager(&s.nodeIDContainer, *s.db, s.clock, lmKnobs,
s.stopper, &s.internalMemMetrics)
s.stopper, &s.internalMemMetrics, s.cfg.LeaseManagerConfig)
s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)

// We do not set memory monitors or a noteworthy limit because the children of
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config {
cfg.TestingKnobs = params.Knobs
cfg.RaftConfig = params.RaftConfig
cfg.RaftConfig.SetDefaults()
cfg.LeaseManagerConfig = params.LeaseManagerConfig
cfg.LeaseManagerConfig.SetDefaults()
if params.JoinAddr != "" {
cfg.JoinList = []string{params.JoinAddr}
}
Expand Down
41 changes: 25 additions & 16 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,6 @@ import (
// TODO(pmattis): Periodically renew leases for tables that were used recently and
// for which the lease will expire soon.

var (
// LeaseDuration is the mean duration a lease will be acquired for. The
// actual duration is jittered in the range
// [0.75,1.25]*LeaseDuration. Exported for testing purposes only.
LeaseDuration = 5 * time.Minute
)

// tableVersionState holds the state for a table version. This includes
// the lease information for a table version.
// TODO(vivek): A node only needs to manage lease information on what it
Expand Down Expand Up @@ -118,14 +111,27 @@ type LeaseStore struct {
clock *hlc.Clock
nodeID *base.NodeIDContainer

// leaseDuration is the mean duration a lease will be acquired for. The
// actual duration is jittered using leaseJitterFraction. Jittering is done to
// prevent multiple leases from being renewed simultaneously if they were all
// acquired simultaneously.
leaseDuration time.Duration
// leaseJitterFraction is the factor that we use to randomly jitter the lease
// duration when acquiring a new lease and the lease renewal timeout. The
// range of the actual lease duration will be
// [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration]
// Exported for testing purposes only.
leaseJitterFraction float64

testingKnobs LeaseStoreTestingKnobs
memMetrics *MemoryMetrics
}

// jitteredLeaseDuration returns a randomly jittered duration from the interval
// [0.75 * leaseDuration, 1.25 * leaseDuration].
func jitteredLeaseDuration() time.Duration {
return time.Duration(float64(LeaseDuration) * (0.75 + 0.5*rand.Float64()))
// [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration].
func (s LeaseStore) jitteredLeaseDuration() time.Duration {
return time.Duration(float64(s.leaseDuration) * (1 - s.leaseJitterFraction +
2*s.leaseJitterFraction*rand.Float64()))
}

// acquire a lease on the most recent version of a table descriptor.
Expand All @@ -137,7 +143,7 @@ func (s LeaseStore) acquire(
var table *tableVersionState
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
expiration := txn.OrigTimestamp()
expiration.WallTime += int64(jitteredLeaseDuration())
expiration.WallTime += int64(s.jitteredLeaseDuration())
if expiration.Less(minExpirationTime) {
expiration = minExpirationTime
}
Expand Down Expand Up @@ -1096,14 +1102,17 @@ func NewLeaseManager(
testingKnobs LeaseManagerTestingKnobs,
stopper *stop.Stopper,
memMetrics *MemoryMetrics,
cfg base.LeaseManagerConfig,
) *LeaseManager {
lm := &LeaseManager{
LeaseStore: LeaseStore{
db: db,
clock: clock,
nodeID: nodeID,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
db: db,
clock: clock,
nodeID: nodeID,
leaseDuration: cfg.TableDescriptorLeaseDuration,
leaseJitterFraction: *cfg.TableDescriptorLeaseJitterFraction,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
},
testingKnobs: testingKnobs,
tableNames: tableNameCache{
Expand Down
18 changes: 7 additions & 11 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@ type leaseTest struct {
kvDB *client.DB
nodes map[uint32]*sql.LeaseManager
leaseManagerTestingKnobs sql.LeaseManagerTestingKnobs
cfg base.LeaseManagerConfig
}

func newLeaseTest(t *testing.T, params base.TestServerArgs) *leaseTest {
params.LeaseManagerConfig.SetDefaults()
s, db, kvDB := serverutils.StartServer(t, params)
leaseTest := &leaseTest{
T: t,
server: s,
db: db,
kvDB: kvDB,
nodes: map[uint32]*sql.LeaseManager{},
cfg: params.LeaseManagerConfig,
}
if params.Knobs.SQLLeaseManager != nil {
leaseTest.leaseManagerTestingKnobs =
Expand Down Expand Up @@ -191,6 +194,7 @@ func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager {
t.leaseManagerTestingKnobs,
t.server.Stopper(),
&sql.MemoryMetrics{},
t.cfg,
)
t.nodes[nodeID] = mgr
}
Expand Down Expand Up @@ -303,6 +307,9 @@ func TestLeaseManager(testingT *testing.T) {
func TestLeaseManagerReacquire(testingT *testing.T) {
defer leaktest.AfterTest(testingT)()
params, _ := createTestServerParams()
// Set the lease duration such that the next lease acquisition will
// require the lease to be reacquired.
params.LeaseManagerConfig.TableDescriptorLeaseDuration = 5 * time.Nanosecond
removalTracker := sql.NewLeaseRemovalTracker()
params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
Expand All @@ -316,17 +323,6 @@ func TestLeaseManagerReacquire(testingT *testing.T) {

const descID = keys.LeaseTableID

// Set the lease duration such that the next lease acquisition will
// require the lease to be reacquired.
savedLeaseDuration := sql.LeaseDuration
defer func() {
sql.LeaseDuration = savedLeaseDuration
}()

sql.LeaseDuration = 5 * time.Nanosecond

time.Sleep(5 * sql.LeaseDuration)

l1, e1 := t.mustAcquire(1, descID)
t.expectLeases(descID, "/1/1")

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,16 @@ func TestSchemaChangeProcess(t *testing.T) {
var id = sqlbase.ID(keys.MaxReservedDescID + 2)
var node = roachpb.NodeID(2)
stopper := stop.NewStopper()
cfg := base.LeaseManagerConfig{}
cfg.SetDefaults()
leaseMgr := sql.NewLeaseManager(
&base.NodeIDContainer{},
*kvDB,
hlc.NewClock(hlc.UnixNano, time.Nanosecond),
sql.LeaseManagerTestingKnobs{},
stopper,
&sql.MemoryMetrics{},
cfg,
)
jobRegistry := s.JobRegistry().(*jobs.Registry)
defer stopper.Stop(context.TODO())
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/txn_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ SELECT * from t.test WHERE k = 'test_key';
func TestReacquireLeaseOnRestart(t *testing.T) {
defer leaktest.AfterTest(t)()

advancement := 2 * sql.LeaseDuration
advancement := 2 * base.DefaultTableDescriptorLeaseDuration

var cmdFilters CommandFilters
cmdFilters.AppendFilter(checkEndTransactionTrigger, true)
Expand Down

0 comments on commit 43feb10

Please sign in to comment.