Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: require app to consume result from Ready() #10920

Merged
merged 1 commit into from
Jul 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (n *node) run(rn *RawNode) {
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = rn.Ready()
rd = rn.readyWithoutAccept()
readyc = n.readyc
}

Expand Down Expand Up @@ -387,7 +387,7 @@ func (n *node) run(rn *RawNode) {
rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
rn.commitReady(rd)
rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
Expand Down
34 changes: 11 additions & 23 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,17 @@ func (rn *RawNode) Step(m pb.Message) error {

// Ready returns the outstanding work that the application needs to handle. This
// includes appending and applying entries or a snapshot, updating the HardState,
// and sending messages. Ready() is a read-only operation, that is, it does not
// require the caller to actually handle the result. Typically, a caller will
// want to handle the Ready and must pass the Ready to Advance *after* having
// done so. While a Ready is being handled, the RawNode must not be used for
// operations that may alter its state. For example, it is illegal to call
// Ready, followed by Step, followed by Advance.
// and sending messages. The returned Ready() *must* be handled and subsequently
// passed back via Advance().
func (rn *RawNode) Ready() Ready {
rd := rn.newReady()
rd := rn.readyWithoutAccept()
rn.acceptReady(rd)
return rd
}

func (rn *RawNode) newReady() Ready {
// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
// is no obligation that the Ready must be handled.
func (rn *RawNode) readyWithoutAccept() Ready {
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}

Expand All @@ -149,15 +148,6 @@ func (rn *RawNode) acceptReady(rd Ready) {
rn.raft.msgs = nil
}

// commitReady is called when the consumer of the RawNode has successfully
// handled a Ready (having previously called acceptReady).
func (rn *RawNode) commitReady(rd Ready) {
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
rn.raft.advance(rd)
}

// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
Expand All @@ -183,12 +173,10 @@ func (rn *RawNode) HasReady() bool {
// Advance notifies the RawNode that the application has applied and saved progress in the
// last Ready results.
func (rn *RawNode) Advance(rd Ready) {
// Advance combines accept and commit. Callers can't mutate the RawNode
// between the call to Ready and the matching call to Advance, or the work
// done in acceptReady will clobber potentially newer data that has not been
// emitted in a Ready yet.
rn.acceptReady(rd)
rn.commitReady(rd)
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
rn.raft.advance(rd)
}

// Status returns the current status of the given group. This allocates, see
Expand Down
34 changes: 34 additions & 0 deletions raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,37 @@ func BenchmarkStatus(b *testing.B) {
})
}
}

func TestRawNodeConsumeReady(t *testing.T) {
// Check that readyWithoutAccept() does not call acceptReady (which resets
// the messages) but Ready() does.
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 3, 1, s)
m1 := pb.Message{Context: []byte("foo")}
m2 := pb.Message{Context: []byte("bar")}

// Inject first message, make sure it's visible via readyWithoutAccept.
rn.raft.msgs = append(rn.raft.msgs, m1)
rd := rn.readyWithoutAccept()
if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
}
if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) {
t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs)
}
// Now call Ready() which should move the message into the Ready (as opposed
// to leaving it in both places).
rd = rn.Ready()
if len(rn.raft.msgs) > 0 {
t.Fatalf("messages not reset: %+v", rn.raft.msgs)
}
if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
}
// Add a message to raft to make sure that Advance() doesn't drop it.
rn.raft.msgs = append(rn.raft.msgs, m2)
rn.Advance(rd)
if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) {
t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
}
}