Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add low-level unit test for simple Raft election #4861

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3616,7 +3616,7 @@ func (vr *voteRequest) encode() []byte {
return buf[:voteRequestLen]
}

func (n *raft) decodeVoteRequest(msg []byte, reply string) *voteRequest {
func decodeVoteRequest(msg []byte, reply string) *voteRequest {
if len(msg) != voteRequestLen {
return nil
}
Expand Down Expand Up @@ -3818,7 +3818,7 @@ func (vr *voteResponse) encode() []byte {
return buf[:voteResponseLen]
}

func (n *raft) decodeVoteResponse(msg []byte) *voteResponse {
func decodeVoteResponse(msg []byte) *voteResponse {
if len(msg) != voteResponseLen {
return nil
}
Expand All @@ -3829,7 +3829,7 @@ func (n *raft) decodeVoteResponse(msg []byte) *voteResponse {
}

func (n *raft) handleVoteResponse(sub *subscription, c *client, _ *Account, _, reply string, msg []byte) {
vr := n.decodeVoteResponse(msg)
vr := decodeVoteResponse(msg)
n.debug("Received a voteResponse %+v", vr)
if vr == nil {
n.error("Received malformed vote response for %q", n.group)
Expand Down Expand Up @@ -3903,7 +3903,7 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
}

func (n *raft) handleVoteRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
vr := n.decodeVoteRequest(msg, reply)
vr := decodeVoteRequest(msg, reply)
if vr == nil {
n.error("Received malformed vote request for %q", n.group)
return
Expand Down
79 changes: 79 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"math/rand"
"testing"
"time"

"github.com/nats-io/nats.go"
)

func TestNRGSimple(t *testing.T) {
Expand Down Expand Up @@ -214,3 +216,80 @@ func TestNRGObserverMode(t *testing.T) {
require_True(t, n.node().IsObserver())
}
}

// TestNRGSimpleElection tests that a simple election succeeds. It is
// simple because the group hasn't processed any entries and hasn't
// suffered any interruptions of any kind, therefore there should be
// no way that the conditions for granting the votes can fail.
func TestNRGSimpleElection(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 9)
defer c.shutdown()
c.waitOnLeader()

nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

rg := c.createRaftGroup("TEST", 9, newStateAdder)
rg.waitOnLeader()

voteReqs := make(chan *nats.Msg, 1)
voteResps := make(chan *nats.Msg, len(rg)-1)

// Keep a record of the term when we started.
leader := rg.leader().node().(*raft)
startTerm := leader.term

// Subscribe to the vote request subject, this should be the
// same across all nodes in the group.
_, err := nc.ChanSubscribe(leader.vsubj, voteReqs)
require_NoError(t, err)

// Subscribe to all of the vote response inboxes for all nodes
// in the Raft group, as they can differ.
for _, n := range rg {
rn := n.node().(*raft)
_, err = nc.ChanSubscribe(rn.vreply, voteResps)
require_NoError(t, err)
}

// Step down, this will start a new voting session.
require_NoError(t, rg.leader().node().StepDown())

// Wait for a vote request to come in.
msg := require_ChanRead(t, voteReqs, time.Second)
vr := decodeVoteRequest(msg.Data, msg.Reply)
require_True(t, vr != nil)
require_NotEqual(t, vr.candidate, "")

// The leader should have bumped their term in order to start
// an election.
require_Equal(t, vr.term, startTerm+1)
require_Equal(t, vr.lastTerm, startTerm)

// Wait for all of the vote responses to come in. There should
// be as many vote responses as there are followers.
for i := 0; i < len(rg)-1; i++ {
msg := require_ChanRead(t, voteResps, time.Second)
re := decodeVoteResponse(msg.Data)
require_True(t, re != nil)

// The new term hasn't started yet, so the vote responses
// should contain the term from before the election. It is
// possible that candidates are listening to this to work
// out if they are in previous terms.
require_Equal(t, re.term, vr.lastTerm)
require_Equal(t, re.term, startTerm)

// The vote should have been granted.
require_Equal(t, re.granted, true)
}

// Everyone in the group should have voted for our candidate
// and arrived at the term from the vote request.
for _, n := range rg {
rn := n.node().(*raft)
require_Equal(t, rn.term, vr.term)
require_Equal(t, rn.term, startTerm+1)
require_Equal(t, rn.vote, vr.candidate)
}
}
11 changes: 11 additions & 0 deletions server/test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ func require_LessThan[T ordered](t *testing.T, a, b T) {
}
}

func require_ChanRead[T any](t *testing.T, ch chan T, timeout time.Duration) T {
t.Helper()
select {
case v := <-ch:
return v
case <-time.After(timeout):
t.Fatalf("require read from channel within %v but didn't get anything", timeout)
}
panic("this shouldn't be possible")
}

func checkNatsError(t *testing.T, e *ApiError, id ErrorIdentifier) {
t.Helper()
ae, ok := ApiErrors[id]
Expand Down