Skip to content

Commit 365023d

Browse files
author
Mahmood Ali
authored
Merge pull request hashicorp#384 from hashicorp/f-stricter-leaderch-semantics
raft.LeaderCh() always deliver latest transition
2 parents e6b82b0 + 9e800fe commit 365023d

File tree

4 files changed

+78
-7
lines changed

4 files changed

+78
-7
lines changed

api.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
503503
fsm: fsm,
504504
fsmMutateCh: make(chan interface{}, 128),
505505
fsmSnapshotCh: make(chan *reqSnapshotFuture),
506-
leaderCh: make(chan bool),
506+
leaderCh: make(chan bool, 1),
507507
localID: localID,
508508
localAddr: localAddr,
509509
logger: logger,
@@ -952,10 +952,17 @@ func (r *Raft) State() RaftState {
952952
return r.getState()
953953
}
954954

955-
// LeaderCh is used to get a channel which delivers signals on
956-
// acquiring or losing leadership. It sends true if we become
957-
// the leader, and false if we lose it. The channel is not buffered,
958-
// and does not block on writes.
955+
// LeaderCh is used to get a channel which delivers signals on acquiring or
956+
// losing leadership. It sends true if we become the leader, and false if we
957+
// lose it.
958+
//
959+
// Receivers can expect to receive a notification only if leadership
960+
// transition has occured.
961+
//
962+
// If receivers aren't ready for the signal, signals may drop and only the
963+
// latest leadership transition. For example, if a receiver receives subsequent
964+
// `true` values, they may deduce that leadership was lost and regained while
965+
// the the receiver was processing first leadership transition.
959966
func (r *Raft) LeaderCh() <-chan bool {
960967
return r.leaderCh
961968
}

raft.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (r *Raft) runLeader() {
364364
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
365365

366366
// Notify that we are the leader
367-
asyncNotifyBool(r.leaderCh, true)
367+
overrideNotifyBool(r.leaderCh, true)
368368

369369
// Push to the notify channel if given
370370
if notify := r.conf.NotifyCh; notify != nil {
@@ -420,7 +420,7 @@ func (r *Raft) runLeader() {
420420
r.leaderLock.Unlock()
421421

422422
// Notify that we are not the leader
423-
asyncNotifyBool(r.leaderCh, false)
423+
overrideNotifyBool(r.leaderCh, false)
424424

425425
// Push to the notify channel if given
426426
if notify := r.conf.NotifyCh; notify != nil {

util.go

+19
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,25 @@ func asyncNotifyBool(ch chan bool, v bool) {
9696
}
9797
}
9898

99+
// overrideNotifyBool is used to notify on a bool channel
100+
// but override existing value if value is present.
101+
// ch must be 1-item buffered channel.
102+
//
103+
// This method does not support multiple concurrent calls.
104+
func overrideNotifyBool(ch chan bool, v bool) {
105+
select {
106+
case ch <- v:
107+
// value sent, all done
108+
case <-ch:
109+
// channel had an old value
110+
select {
111+
case ch <- v:
112+
default:
113+
panic("race: channel was sent concurrently")
114+
}
115+
}
116+
}
117+
99118
// Decode reverses the encode operation on a byte slice input.
100119
func decodeMsgPack(buf []byte, out interface{}) error {
101120
r := bytes.NewBuffer(buf)

util_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,48 @@ func TestBackoff(t *testing.T) {
100100
t.Fatalf("bad: %v", b)
101101
}
102102
}
103+
104+
func TestOverrideNotifyBool(t *testing.T) {
105+
ch := make(chan bool, 1)
106+
107+
// sanity check - buffered channel don't have any values
108+
select {
109+
case v := <-ch:
110+
t.Fatalf("unexpected receive: %v", v)
111+
default:
112+
}
113+
114+
// simple case of a single push
115+
overrideNotifyBool(ch, false)
116+
select {
117+
case v := <-ch:
118+
if v != false {
119+
t.Fatalf("expected false but got %v", v)
120+
}
121+
default:
122+
t.Fatalf("expected a value but is not ready")
123+
}
124+
125+
// assert that function never blocks and only last item is received
126+
overrideNotifyBool(ch, false)
127+
overrideNotifyBool(ch, false)
128+
overrideNotifyBool(ch, false)
129+
overrideNotifyBool(ch, false)
130+
overrideNotifyBool(ch, true)
131+
132+
select {
133+
case v := <-ch:
134+
if v != true {
135+
t.Fatalf("expected true but got %v", v)
136+
}
137+
default:
138+
t.Fatalf("expected a value but is not ready")
139+
}
140+
141+
// no further value is available
142+
select {
143+
case v := <-ch:
144+
t.Fatalf("unexpected receive: %v", v)
145+
default:
146+
}
147+
}

0 commit comments

Comments
 (0)