Skip to content

Commit

Permalink
sql: Make LeaseDuration configurable and prevent test collisions
Browse files Browse the repository at this point in the history
Previously, the global LeaseDuration value was modified during a test.
If tests doing this are run in parallel, this would cause race problems.
The global has now been modified to live on the LeaseStore where it is
referenced from. It is also now configured during server initialization.

A LeaseJitterFraction constant is also refactored out. This is used in
later path.
  • Loading branch information
lgo committed Oct 18, 2017
1 parent 3062387 commit 1c46598
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 31 deletions.
32 changes: 32 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ const (
// with a value of 0.2 and a liveness duration of 10 seconds, each node's
// liveness record would be eagerly renewed after 2 seconds.
livenessRenewalFraction = 0.5

// DefaultTableDescriptorLeaseDuration is the default mean duration a
// lease will be acquired for. The actual duration is jittered using
// the jitter fraction. Jittering is done to prevent multiple leases
// from being renewed simultaneously if they were all acquired
// simultaneously.
DefaultTableDescriptorLeaseDuration = 5 * time.Minute

// DefaultTableDescriptorLeaseJitterFraction is the default factor
// that we use to randomly jitter the lease duration when acquiring a
// new lease and the lease renewal timeout.
DefaultTableDescriptorLeaseJitterFraction = 0.25
)

var defaultRaftElectionTimeoutTicks = envutil.EnvOrDefaultInt(
Expand Down Expand Up @@ -466,3 +478,23 @@ func TempStorageConfigFromEnv(
Mon: &monitor,
}
}

// LeaseManagerConfig holds table lease manager parameters.
type LeaseManagerConfig struct {
// TableDescriptorLeaseDuration is the mean duration a lease will be
// acquired for.
TableDescriptorLeaseDuration time.Duration

// TableDescriptorLeaseJitterFraction is the factor that we use to
// randomly jitter the lease duration when acquiring a new lease and
// the lease renewal timeout.
TableDescriptorLeaseJitterFraction float64
}

// NewLeaseManagerConfig initializes a LeaseManagerConfig with default values.
func NewLeaseManagerConfig() *LeaseManagerConfig {
return &LeaseManagerConfig{
TableDescriptorLeaseDuration: DefaultTableDescriptorLeaseDuration,
TableDescriptorLeaseJitterFraction: DefaultTableDescriptorLeaseJitterFraction,
}
}
3 changes: 3 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type TestServerArgs struct {
*cluster.Settings
RaftConfig

// LeaseManagerConfig holds configuration values specific to the LeaseManager.
LeaseManagerConfig *LeaseManagerConfig

// PartOfCluster must be set if the TestServer is joining others in a cluster.
// If not set (and hence the server is the only one in the cluster), the
// default zone config will be overridden to disable all replication - so that
Expand Down
4 changes: 4 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ type Config struct {

base.RaftConfig

// LeaseManagerConfig holds configuration values specific to the LeaseManager.
LeaseManagerConfig *base.LeaseManagerConfig

// Unix socket: for postgres only.
SocketFile string

Expand Down Expand Up @@ -369,6 +372,7 @@ func MakeConfig(ctx context.Context, st *cluster.Settings) Config {

cfg.Config.InitDefaults()
cfg.RaftConfig.SetDefaults()
cfg.LeaseManagerConfig = base.NewLeaseManagerConfig()
return cfg
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
lmKnobs = *leaseManagerTestingKnobs.(*sql.LeaseManagerTestingKnobs)
}
s.leaseMgr = sql.NewLeaseManager(&s.nodeIDContainer, *s.db, s.clock, lmKnobs,
s.stopper, &s.internalMemMetrics)
s.stopper, &s.internalMemMetrics, s.cfg.LeaseManagerConfig)
s.leaseMgr.RefreshLeases(s.stopper, s.db, s.gossip)

// We do not set memory monitors or a noteworthy limit because the children of
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func makeTestConfigFromParams(params base.TestServerArgs) Config {
cfg.TestingKnobs = params.Knobs
cfg.RaftConfig = params.RaftConfig
cfg.RaftConfig.SetDefaults()
if params.LeaseManagerConfig != nil {
cfg.LeaseManagerConfig = params.LeaseManagerConfig
} else {
cfg.LeaseManagerConfig = base.NewLeaseManagerConfig()
}
if params.JoinAddr != "" {
cfg.JoinList = []string{params.JoinAddr}
}
Expand Down
40 changes: 24 additions & 16 deletions pkg/sql/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,6 @@ import (
// TODO(pmattis): Periodically renew leases for tables that were used recently and
// for which the lease will expire soon.

var (
// LeaseDuration is the mean duration a lease will be acquired for. The
// actual duration is jittered in the range
// [0.75,1.25]*LeaseDuration. Exported for testing purposes only.
LeaseDuration = 5 * time.Minute
)

// tableVersionState holds the state for a table version. This includes
// the lease information for a table version.
// TODO(vivek): A node only needs to manage lease information on what it
Expand Down Expand Up @@ -118,14 +111,26 @@ type LeaseStore struct {
clock *hlc.Clock
nodeID *base.NodeIDContainer

// leaseDuration is the mean duration a lease will be acquired for. The
// actual duration is jittered using leaseJitterFraction. Jittering is done to
// prevent multiple leases from being renewed simultaneously if they were all
// acquired simultaneously.
leaseDuration time.Duration
// leaseJitterFraction is the factor that we use to randomly jitter the lease
// duration when acquiring a new lease and the lease renewal timeout. The
// range of the actual lease duration will be
// [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration]
leaseJitterFraction float64

testingKnobs LeaseStoreTestingKnobs
memMetrics *MemoryMetrics
}

// jitteredLeaseDuration returns a randomly jittered duration from the interval
// [0.75 * leaseDuration, 1.25 * leaseDuration].
func jitteredLeaseDuration() time.Duration {
return time.Duration(float64(LeaseDuration) * (0.75 + 0.5*rand.Float64()))
// [(1-leaseJitterFraction) * leaseDuration, (1+leaseJitterFraction) * leaseDuration].
func (s LeaseStore) jitteredLeaseDuration() time.Duration {
return time.Duration(float64(s.leaseDuration) * (1 - s.leaseJitterFraction +
2*s.leaseJitterFraction*rand.Float64()))
}

// acquire a lease on the most recent version of a table descriptor.
Expand All @@ -137,7 +142,7 @@ func (s LeaseStore) acquire(
var table *tableVersionState
err := s.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
expiration := txn.OrigTimestamp()
expiration.WallTime += int64(jitteredLeaseDuration())
expiration.WallTime += int64(s.jitteredLeaseDuration())
if expiration.Less(minExpirationTime) {
expiration = minExpirationTime
}
Expand Down Expand Up @@ -1096,14 +1101,17 @@ func NewLeaseManager(
testingKnobs LeaseManagerTestingKnobs,
stopper *stop.Stopper,
memMetrics *MemoryMetrics,
cfg *base.LeaseManagerConfig,
) *LeaseManager {
lm := &LeaseManager{
LeaseStore: LeaseStore{
db: db,
clock: clock,
nodeID: nodeID,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
db: db,
clock: clock,
nodeID: nodeID,
leaseDuration: cfg.TableDescriptorLeaseDuration,
leaseJitterFraction: cfg.TableDescriptorLeaseJitterFraction,
testingKnobs: testingKnobs.LeaseStoreTestingKnobs,
memMetrics: memMetrics,
},
testingKnobs: testingKnobs,
tableNames: tableNameCache{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);

// Try to trigger the race repeatedly: race an AcquireByName against a
// Release.
// tableChan acts as a barrier, synchornizing the two routines at every
// tableChan acts as a barrier, synchronizing the two routines at every
// iteration.
tableChan := make(chan *sqlbase.TableDescriptor)
errChan := make(chan error)
Expand Down
27 changes: 15 additions & 12 deletions pkg/sql/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,21 @@ type leaseTest struct {
kvDB *client.DB
nodes map[uint32]*sql.LeaseManager
leaseManagerTestingKnobs sql.LeaseManagerTestingKnobs
cfg *base.LeaseManagerConfig
}

func newLeaseTest(t *testing.T, params base.TestServerArgs) *leaseTest {
if params.LeaseManagerConfig == nil {
params.LeaseManagerConfig = base.NewLeaseManagerConfig()
}
s, db, kvDB := serverutils.StartServer(t, params)
leaseTest := &leaseTest{
T: t,
server: s,
db: db,
kvDB: kvDB,
nodes: map[uint32]*sql.LeaseManager{},
cfg: params.LeaseManagerConfig,
}
if params.Knobs.SQLLeaseManager != nil {
leaseTest.leaseManagerTestingKnobs =
Expand Down Expand Up @@ -191,6 +196,7 @@ func (t *leaseTest) node(nodeID uint32) *sql.LeaseManager {
t.leaseManagerTestingKnobs,
t.server.Stopper(),
&sql.MemoryMetrics{},
t.cfg,
)
t.nodes[nodeID] = mgr
}
Expand Down Expand Up @@ -223,8 +229,10 @@ func TestLeaseManager(testingT *testing.T) {
// table and expiration.
l1, e1 := t.mustAcquire(1, descID)
l2, e2 := t.mustAcquire(1, descID)
if l1.ID != l2.ID || e1 != e2 {
if l1.ID != l2.ID {
t.Fatalf("expected same lease, but found %v != %v", l1, l2)
} else if e1 != e2 {
t.Fatalf("expected same lease timestamps, but found %v != %v", e1, e2)
}
t.expectLeases(descID, "/1/1")
// Node 2 never acquired a lease on descID, so we should expect an error.
Expand Down Expand Up @@ -303,6 +311,12 @@ func TestLeaseManager(testingT *testing.T) {
func TestLeaseManagerReacquire(testingT *testing.T) {
defer leaktest.AfterTest(testingT)()
params, _ := createTestServerParams()

params.LeaseManagerConfig = base.NewLeaseManagerConfig()
// Set the lease duration such that the next lease acquisition will
// require the lease to be reacquired.
params.LeaseManagerConfig.TableDescriptorLeaseDuration = 0

removalTracker := sql.NewLeaseRemovalTracker()
params.Knobs = base.TestingKnobs{
SQLLeaseManager: &sql.LeaseManagerTestingKnobs{
Expand All @@ -316,17 +330,6 @@ func TestLeaseManagerReacquire(testingT *testing.T) {

const descID = keys.LeaseTableID

// Set the lease duration such that the next lease acquisition will
// require the lease to be reacquired.
savedLeaseDuration := sql.LeaseDuration
defer func() {
sql.LeaseDuration = savedLeaseDuration
}()

sql.LeaseDuration = 5 * time.Nanosecond

time.Sleep(5 * sql.LeaseDuration)

l1, e1 := t.mustAcquire(1, descID)
t.expectLeases(descID, "/1/1")

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,15 @@ func TestSchemaChangeProcess(t *testing.T) {
var id = sqlbase.ID(keys.MaxReservedDescID + 2)
var node = roachpb.NodeID(2)
stopper := stop.NewStopper()
cfg := base.NewLeaseManagerConfig()
leaseMgr := sql.NewLeaseManager(
&base.NodeIDContainer{},
*kvDB,
hlc.NewClock(hlc.UnixNano, time.Nanosecond),
sql.LeaseManagerTestingKnobs{},
stopper,
&sql.MemoryMetrics{},
cfg,
)
jobRegistry := s.JobRegistry().(*jobs.Registry)
defer stopper.Stop(context.TODO())
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/txn_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ SELECT * from t.test WHERE k = 'test_key';
func TestReacquireLeaseOnRestart(t *testing.T) {
defer leaktest.AfterTest(t)()

advancement := 2 * sql.LeaseDuration
advancement := 2 * base.DefaultTableDescriptorLeaseDuration

var cmdFilters CommandFilters
cmdFilters.AppendFilter(checkEndTransactionTrigger, true)
Expand Down

0 comments on commit 1c46598

Please sign in to comment.