Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver,rpc: set the stage for maintaining a local blocklist of permanently removed nodes #54936

Merged
merged 8 commits into from
Oct 14, 2020
54 changes: 32 additions & 22 deletions pkg/kv/kvserver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,10 +906,11 @@ func (m *multiTestContext) addStore(idx int) {
ambient := log.AmbientContext{Tracer: cfg.Settings.Tracer}
m.populateDB(idx, cfg.Settings, stopper)
nlActive, nlRenewal := cfg.NodeLivenessDurations()
m.nodeLivenesses[idx] = kvserver.NewNodeLiveness(
ambient, m.clocks[idx], m.dbs[idx], m.gossips[idx],
nlActive, nlRenewal, cfg.Settings, metric.TestSampleInterval,
)
m.nodeLivenesses[idx] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{
AmbientCtx: ambient, Clock: m.clocks[idx], DB: m.dbs[idx], Gossip: m.gossips[idx],
LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings,
HistogramWindowInterval: metric.TestSampleInterval,
})
m.populateStorePool(idx, cfg, m.nodeLivenesses[idx])
cfg.DB = m.dbs[idx]
cfg.NodeLiveness = m.nodeLivenesses[idx]
Expand Down Expand Up @@ -1018,15 +1019,19 @@ func (m *multiTestContext) addStore(idx int) {
m.t.Fatal(err)
}
}
m.nodeLivenesses[idx].StartHeartbeat(ctx, stopper, m.engines[idx:idx+1], func(ctx context.Context) {
now := clock.Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warningf(ctx, "%v", err)
}
ran.Do(func() {
close(ran.ch)
})
})
m.nodeLivenesses[idx].Start(ctx,
kvserver.NodeLivenessStartOptions{
Stopper: stopper,
Engines: m.engines[idx : idx+1],
OnSelfLive: func(ctx context.Context) {
now := clock.Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warningf(ctx, "%v", err)
}
ran.Do(func() {
close(ran.ch)
})
}})

store.WaitForInit()

Expand Down Expand Up @@ -1095,10 +1100,11 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) {
cfg := m.makeStoreConfig(i)
m.populateDB(i, m.storeConfig.Settings, stopper)
nlActive, nlRenewal := cfg.NodeLivenessDurations()
m.nodeLivenesses[i] = kvserver.NewNodeLiveness(
log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, m.clocks[i], m.dbs[i],
m.gossips[i], nlActive, nlRenewal, cfg.Settings, metric.TestSampleInterval,
)
m.nodeLivenesses[i] = kvserver.NewNodeLiveness(kvserver.NodeLivenessOptions{
AmbientCtx: log.AmbientContext{Tracer: m.storeConfig.Settings.Tracer}, Clock: m.clocks[i], DB: m.dbs[i],
Gossip: m.gossips[i], LivenessThreshold: nlActive, RenewalDuration: nlRenewal, Settings: cfg.Settings,
HistogramWindowInterval: metric.TestSampleInterval,
})
m.populateStorePool(i, cfg, m.nodeLivenesses[i])
cfg.DB = m.dbs[i]
cfg.NodeLiveness = m.nodeLivenesses[i]
Expand All @@ -1115,11 +1121,15 @@ func (m *multiTestContext) restartStoreWithoutHeartbeat(i int) {
m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.DefaultClass).Reset()
m.transport.GetCircuitBreaker(m.idents[i].NodeID, rpc.SystemClass).Reset()
m.mu.Unlock()
cfg.NodeLiveness.StartHeartbeat(ctx, stopper, m.engines[i:i+1], func(ctx context.Context) {
now := m.clocks[i].Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warningf(ctx, "%v", err)
}
cfg.NodeLiveness.Start(ctx, kvserver.NodeLivenessStartOptions{
Stopper: stopper,
Engines: m.engines[i : i+1],
OnSelfLive: func(ctx context.Context) {
now := m.clocks[i].Now()
if err := store.WriteLastUpTimestamp(ctx, now); err != nil {
log.Warningf(ctx, "%v", err)
}
},
})
}

Expand Down
139 changes: 85 additions & 54 deletions pkg/kv/kvserver/node_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,14 @@ type NodeLiveness struct {
// heartbeatPaused contains an atomically-swapped number representing a bool
// (1 or 0). heartbeatToken is a channel containing a token which is taken
// when heartbeating or when pausing the heartbeat. Used for testing.
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics LivenessMetrics
heartbeatPaused uint32
heartbeatToken chan struct{}
metrics LivenessMetrics
onNodeDecommissioned func(kvserverpb.Liveness) // noop if nil

mu struct {
syncutil.RWMutex
callbacks []IsLiveCallback
onIsLive []IsLiveCallback // see NodeLivenessOptions.OnSelfLive
// nodes is an in-memory cache of liveness records that NodeLiveness
// knows about (having learnt of them through gossip or through KV).
// It's a look-aside cache, and is accessed primarily through
Expand All @@ -205,7 +206,7 @@ type NodeLiveness struct {
// - Update the liveness record in KV
// - Add the updated record into this cache (see `maybeUpdate`)
//
// (See `StartHeartbeat` for an example of this pattern.)
// (See `Start` for an example of this pattern.)
//
// What we want instead is a bit simpler:
//
Expand All @@ -216,11 +217,11 @@ type NodeLiveness struct {
//
// More concretely, we want `getLivenessRecordFromKV` to be tucked away
// within `getLivenessLocked`.
nodes map[roachpb.NodeID]LivenessRecord
heartbeatCallback HeartbeatCallback
nodes map[roachpb.NodeID]LivenessRecord
onSelfLive HeartbeatCallback // set in Start()
// Before heartbeating, we write to each of these engines to avoid
// maintaining liveness when a local disks is stalled.
engines []storage.Engine
engines []storage.Engine // set in Start()
}
}

Expand All @@ -235,41 +236,61 @@ type LivenessRecord struct {
raw []byte
}

// NodeLivenessOptions is the input to NewNodeLiveness.
//
// Note that there is yet another struct, NodeLivenessStartOptions, which
// is supplied when the instance is started. This is necessary as during
// server startup, some inputs can only be constructed at Start time. The
// separation has grown organically and various options could in principle
// be moved back and forth.
type NodeLivenessOptions struct {
AmbientCtx log.AmbientContext
Settings *cluster.Settings
Gossip *gossip.Gossip
Clock *hlc.Clock
DB *kv.DB
LivenessThreshold time.Duration
RenewalDuration time.Duration
HistogramWindowInterval time.Duration
// OnNodeDecommissioned is invoked whenever the instance learns that a
// node was permanently removed from the cluster. This method must be
// idempotent as it may be invoked multiple times and defaults to a
// noop.
OnNodeDecommissioned func(kvserverpb.Liveness)
}

// NewNodeLiveness returns a new instance of NodeLiveness configured
// with the specified gossip instance.
func NewNodeLiveness(
ambient log.AmbientContext,
clock *hlc.Clock,
db *kv.DB,
g *gossip.Gossip,
livenessThreshold time.Duration,
renewalDuration time.Duration,
st *cluster.Settings,
histogramWindow time.Duration,
) *NodeLiveness {
func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
nl := &NodeLiveness{
ambientCtx: ambient,
clock: clock,
db: db,
gossip: g,
livenessThreshold: livenessThreshold,
heartbeatInterval: livenessThreshold - renewalDuration,
selfSem: make(chan struct{}, 1),
st: st,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
ambientCtx: opts.AmbientCtx,
clock: opts.Clock,
db: opts.DB,
gossip: opts.Gossip,
livenessThreshold: opts.LivenessThreshold,
heartbeatInterval: opts.LivenessThreshold - opts.RenewalDuration,
selfSem: make(chan struct{}, 1),
st: opts.Settings,
otherSem: make(chan struct{}, 1),
heartbeatToken: make(chan struct{}, 1),
onNodeDecommissioned: opts.OnNodeDecommissioned,
}
nl.metrics = LivenessMetrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
HeartbeatsInFlight: metric.NewGauge(metaHeartbeatsInFlight),
HeartbeatSuccesses: metric.NewCounter(metaHeartbeatSuccesses),
HeartbeatFailures: metric.NewCounter(metaHeartbeatFailures),
EpochIncrements: metric.NewCounter(metaEpochIncrements),
HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, histogramWindow),
HeartbeatLatency: metric.NewLatency(metaHeartbeatLatency, opts.HistogramWindowInterval),
}
nl.mu.nodes = make(map[roachpb.NodeID]LivenessRecord)
nl.heartbeatToken <- struct{}{}

// NB: we should consider moving this registration to .Start() once we
// have ensured that nobody uses the server's KV client (kv.DB) before
// nl.Start() is invoked. At the time of writing this invariant does
// not hold (which is a problem, since the node itself won't be live
// at this point, and requests routed to it will hang).
livenessRegex := gossip.MakePrefixPattern(gossip.KeyNodeLivenessPrefix)
nl.gossip.RegisterCallback(livenessRegex, nl.livenessGossipUpdate)

Expand Down Expand Up @@ -499,7 +520,7 @@ type livenessUpdate struct {
// given node ID. This is typically used when adding a new node to a running
// cluster, or when bootstrapping a cluster through a given node.
//
// This is a pared down version of StartHeartbeat; it exists only to durably
// This is a pared down version of Start; it exists only to durably
// persist a liveness to record the node's existence. Nodes will heartbeat their
// records after starting up, and incrementing to epoch=1 when doing so, at
// which point we'll set an appropriate expiration timestamp, gossip the
Expand Down Expand Up @@ -619,32 +640,39 @@ func (nl *NodeLiveness) IsLive(nodeID roachpb.NodeID) (bool, error) {
return liveness.IsLive(nl.clock.Now().GoTime()), nil
}

// StartHeartbeat starts a periodic heartbeat to refresh this node's last
// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
type NodeLivenessStartOptions struct {
Stopper *stop.Stopper
Engines []storage.Engine
// OnSelfLive is invoked after every successful heartbeat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could move this comment to NodeLiveness.mu.onSelfLive, probably the first place to look.

// of the local liveness instance's heartbeat loop.
OnSelfLive HeartbeatCallback
}

// Start starts a periodic heartbeat to refresh this node's last
// heartbeat in the node liveness table. The optionally provided
// HeartbeatCallback will be invoked whenever this node updates its own
// liveness. The slice of engines will be written to before each heartbeat to
// avoid maintaining liveness in the presence of disk stalls.
func (nl *NodeLiveness) StartHeartbeat(
ctx context.Context, stopper *stop.Stopper, engines []storage.Engine, alive HeartbeatCallback,
) {
log.VEventf(ctx, 1, "starting liveness heartbeat")
// HeartbeatCallback will be invoked whenever this node updates its
// own liveness. The slice of engines will be written to before each
// heartbeat to avoid maintaining liveness in the presence of disk stalls.
func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions) {
log.VEventf(ctx, 1, "starting node liveness instance")
retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = stopper.ShouldQuiesce()
retryOpts.Closer = opts.Stopper.ShouldQuiesce()

if len(engines) == 0 {
if len(opts.Engines) == 0 {
// Avoid silently forgetting to pass the engines. It happened before.
log.Fatalf(ctx, "must supply at least one engine")
}

nl.mu.Lock()
nl.mu.heartbeatCallback = alive
nl.mu.engines = engines
nl.mu.onSelfLive = opts.OnSelfLive
nl.mu.engines = opts.Engines
nl.mu.Unlock()

stopper.RunWorker(ctx, func(context.Context) {
opts.Stopper.RunWorker(ctx, func(context.Context) {
ambient := nl.ambientCtx
ambient.AddLogTag("liveness-hb", nil)
ctx, cancel := stopper.WithCancelOnStop(context.Background())
ctx, cancel := opts.Stopper.WithCancelOnStop(context.Background())
defer cancel()
ctx, sp := ambient.AnnotateCtxWithSpan(ctx, "liveness heartbeat loop")
defer sp.Finish()
Expand All @@ -655,7 +683,7 @@ func (nl *NodeLiveness) StartHeartbeat(
for {
select {
case <-nl.heartbeatToken:
case <-stopper.ShouldStop():
case <-opts.Stopper.ShouldStop():
return
}
// Give the context a timeout approximately as long as the time we
Expand Down Expand Up @@ -692,7 +720,7 @@ func (nl *NodeLiveness) StartHeartbeat(
nl.heartbeatToken <- struct{}{}
select {
case <-ticker.C:
case <-stopper.ShouldStop():
case <-opts.Stopper.ShouldStop():
return
}
}
Expand Down Expand Up @@ -726,7 +754,7 @@ func (nl *NodeLiveness) PauseHeartbeatLoopForTest() func() {
}

// PauseSynchronousHeartbeatsForTest disables all node liveness
// heartbeats triggered from outside the normal StartHeartbeat loop.
// heartbeats triggered from outside the normal Start loop.
// Returns a closure to call to re-enable synchronous heartbeats. Only
// safe for use in tests.
func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() {
Expand All @@ -739,7 +767,7 @@ func (nl *NodeLiveness) PauseSynchronousHeartbeatsForTest() func() {
}

// PauseAllHeartbeatsForTest disables all node liveness heartbeats,
// including those triggered from outside the normal StartHeartbeat
// including those triggered from outside the normal Start
// loop. Returns a closure to call to re-enable heartbeats. Only safe
// for use in tests.
func (nl *NodeLiveness) PauseAllHeartbeatsForTest() func() {
Expand Down Expand Up @@ -769,7 +797,7 @@ var errNodeAlreadyLive = errors.New("node already live")
// TODO(bdarnell): Fix error semantics here.
//
// This method is rarely called directly; heartbeats are normally sent
// by the StartHeartbeat loop.
// by the Start loop.
// TODO(bdarnell): Should we just remove this synchronous heartbeat completely?
func (nl *NodeLiveness) Heartbeat(ctx context.Context, liveness kvserverpb.Liveness) error {
return nl.heartbeatInternal(ctx, liveness, false /* increment epoch */)
Expand Down Expand Up @@ -1113,7 +1141,7 @@ func (nl *NodeLiveness) Metrics() LivenessMetrics {
func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.callbacks = append(nl.mu.callbacks, cb)
nl.mu.onIsLive = append(nl.mu.onIsLive, cb)
}

// updateLiveness does a conditional put on the node liveness record for the
Expand Down Expand Up @@ -1237,7 +1265,7 @@ func (nl *NodeLiveness) updateLivenessAttempt(
}

nl.mu.RLock()
cb := nl.mu.heartbeatCallback
cb := nl.mu.onSelfLive
nl.mu.RUnlock()
if cb != nil {
cb(ctx)
Expand All @@ -1261,10 +1289,10 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness
shouldReplace = shouldReplaceLiveness(ctx, oldLivenessRec.Liveness, newLivenessRec.Liveness)
}

var callbacks []IsLiveCallback
var onIsLive []IsLiveCallback
if shouldReplace {
nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec
callbacks = append(callbacks, nl.mu.callbacks...)
onIsLive = append(onIsLive, nl.mu.onIsLive...)
}
nl.mu.Unlock()

Expand All @@ -1274,10 +1302,13 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Liveness

now := nl.clock.Now().GoTime()
if !oldLivenessRec.IsLive(now) && newLivenessRec.IsLive(now) {
for _, fn := range callbacks {
for _, fn := range onIsLive {
fn(newLivenessRec.Liveness)
}
}
if newLivenessRec.Membership.Decommissioned() && nl.onNodeDecommissioned != nil {
nl.onNodeDecommissioned(newLivenessRec.Liveness)
}
}

// shouldReplaceLiveness checks to see if the new liveness is in fact newer
Expand Down
Loading