diff --git a/raft/node.go b/raft/node.go index daf068055e6..3f01cbaa290 100644 --- a/raft/node.go +++ b/raft/node.go @@ -316,7 +316,7 @@ func (n *node) run(rn *RawNode) { // handled first, but it's generally good to emit larger Readys plus // it simplifies testing (by emitting less frequently and more // predictably). - rd = rn.Ready() + rd = rn.readyWithoutAccept() readyc = n.readyc } @@ -387,7 +387,7 @@ func (n *node) run(rn *RawNode) { rn.acceptReady(rd) advancec = n.advancec case <-advancec: - rn.commitReady(rd) + rn.Advance(rd) rd = Ready{} advancec = nil case c := <-n.status: diff --git a/raft/rawnode.go b/raft/rawnode.go index 9c192fdd0d1..9aa8a699bcb 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -121,18 +121,17 @@ func (rn *RawNode) Step(m pb.Message) error { // Ready returns the outstanding work that the application needs to handle. This // includes appending and applying entries or a snapshot, updating the HardState, -// and sending messages. Ready() is a read-only operation, that is, it does not -// require the caller to actually handle the result. Typically, a caller will -// want to handle the Ready and must pass the Ready to Advance *after* having -// done so. While a Ready is being handled, the RawNode must not be used for -// operations that may alter its state. For example, it is illegal to call -// Ready, followed by Step, followed by Advance. +// and sending messages. The returned Ready() *must* be handled and subsequently +// passed back via Advance(). func (rn *RawNode) Ready() Ready { - rd := rn.newReady() + rd := rn.readyWithoutAccept() + rn.acceptReady(rd) return rd } -func (rn *RawNode) newReady() Ready { +// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there +// is no obligation that the Ready must be handled. +func (rn *RawNode) readyWithoutAccept() Ready { return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt) } @@ -149,15 +148,6 @@ func (rn *RawNode) acceptReady(rd Ready) { rn.raft.msgs = nil } -// commitReady is called when the consumer of the RawNode has successfully -// handled a Ready (having previously called acceptReady). -func (rn *RawNode) commitReady(rd Ready) { - if !IsEmptyHardState(rd.HardState) { - rn.prevHardSt = rd.HardState - } - rn.raft.advance(rd) -} - // HasReady called when RawNode user need to check if any Ready pending. // Checking logic in this method should be consistent with Ready.containsUpdates(). func (rn *RawNode) HasReady() bool { @@ -183,12 +173,10 @@ func (rn *RawNode) HasReady() bool { // Advance notifies the RawNode that the application has applied and saved progress in the // last Ready results. func (rn *RawNode) Advance(rd Ready) { - // Advance combines accept and commit. Callers can't mutate the RawNode - // between the call to Ready and the matching call to Advance, or the work - // done in acceptReady will clobber potentially newer data that has not been - // emitted in a Ready yet. - rn.acceptReady(rd) - rn.commitReady(rd) + if !IsEmptyHardState(rd.HardState) { + rn.prevHardSt = rd.HardState + } + rn.raft.advance(rd) } // Status returns the current status of the given group. This allocates, see diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 543547a774b..2651aff2a59 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -924,3 +924,37 @@ func BenchmarkStatus(b *testing.B) { }) } } + +func TestRawNodeConsumeReady(t *testing.T) { + // Check that readyWithoutAccept() does not call acceptReady (which resets + // the messages) but Ready() does. + s := NewMemoryStorage() + rn := newTestRawNode(1, []uint64{1}, 3, 1, s) + m1 := pb.Message{Context: []byte("foo")} + m2 := pb.Message{Context: []byte("bar")} + + // Inject first message, make sure it's visible via readyWithoutAccept. + rn.raft.msgs = append(rn.raft.msgs, m1) + rd := rn.readyWithoutAccept() + if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) { + t.Fatalf("expected only m1 sent, got %+v", rd.Messages) + } + if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) { + t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs) + } + // Now call Ready() which should move the message into the Ready (as opposed + // to leaving it in both places). + rd = rn.Ready() + if len(rn.raft.msgs) > 0 { + t.Fatalf("messages not reset: %+v", rn.raft.msgs) + } + if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) { + t.Fatalf("expected only m1 sent, got %+v", rd.Messages) + } + // Add a message to raft to make sure that Advance() doesn't drop it. + rn.raft.msgs = append(rn.raft.msgs, m2) + rn.Advance(rd) + if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) { + t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs) + } +}