Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix raft node getting stuck in candidate state #2418

Merged
merged 5 commits into from
Apr 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [#2404](https://github.com/influxdb/influxdb/pull/2404): Mean and percentile function fixes
- [#2408](https://github.com/influxdb/influxdb/pull/2408): Fix snapshot 500 error
- [#1896](https://github.com/influxdb/influxdb/issues/1896): Excessive heartbeater logging of "connection refused" on cluster node stop
- [#2418](https://github.com/influxdb/influxdb/pull/2418): Fix raft node getting stuck in candidate state

### Features
- [#2410](https://github.com/influxdb/influxdb/pull/2410) Allow configuration of Raft timers
Expand Down
3 changes: 1 addition & 2 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster,
expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",%d]]}]}]}`, numPoints-1)
got, ok, nOK := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second)
if !ok {
t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\n, got: %s\n%d nodes responded correctly", testName, expected, got, nOK)
t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\ngot: %s\n%d nodes responded correctly", testName, expected, got, nOK)
dumpClusterDiags(t, testName, nodes)
dumpClusterStats(t, testName, nodes)
}
Expand Down Expand Up @@ -1524,7 +1524,6 @@ func Test3NodeServerFailover(t *testing.T) {
// ensure that all queries work if there are more nodes in a cluster than the replication factor
// and there is more than 1 shards
func Test5NodeClusterPartiallyReplicated(t *testing.T) {
t.Skip("unstable, skipping for now")
t.Parallel()
testName := "5-node server integration partial replication"
if testing.Short() {
Expand Down
73 changes: 41 additions & 32 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (l *Log) Open(path string) error {
go l.applier(l.closing)

if l.config != nil {
l.Logger.Printf("log open: created at %s, with ID %d, term %d, last applied index of %d", path, l.id, l.term, l.lastLogIndex)
l.printf("log open: created at %s, with ID %d, term %d, last applied index of %d", path, l.id, l.term, l.lastLogIndex)
}

// Retrieve variables to use while starting state loop.
Expand All @@ -419,7 +419,7 @@ func (l *Log) Open(path string) error {
l.startStateLoop(closing, Follower)
}
} else {
l.Logger.Printf("log pending: waiting for initialization or join")
l.printf("log pending: waiting for initialization or join")
}

return nil
Expand Down Expand Up @@ -555,7 +555,7 @@ func (l *Log) writeTerm(term uint64) error {

// setTerm sets the current term and clears the vote.
func (l *Log) setTerm(term uint64) error {
l.Logger.Printf("changing term: %d => %d", l.term, term)
l.printf("changing term: %d => %d", l.term, term)

if err := l.writeTerm(term); err != nil {
return err
Expand Down Expand Up @@ -668,7 +668,7 @@ func (l *Log) Initialize() error {
// Begin state loop as leader.
l.startStateLoop(l.closing, Leader)

l.Logger.Printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d",
l.printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d",
config.ClusterID, l.id, l.term)

// Set initial configuration.
Expand All @@ -693,10 +693,14 @@ func (l *Log) trace(v ...interface{}) {
// trace writes a formatted log message if DebugEnabled is true.
func (l *Log) tracef(msg string, v ...interface{}) {
if l.DebugEnabled {
l.Logger.Printf(msg+"\n", v...)
l.printf(msg, v...)
}
}

func (l *Log) printf(msg string, v ...interface{}) {
l.Logger.Printf(fmt.Sprintf("%s[%d]: ", l.state, l.id)+msg+"\n", v...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A worthy change. I was doing something similar when debugging myself.

}

// IsLeader returns true if the log is the current leader.
func (l *Log) IsLeader() bool {
l.lock()
Expand Down Expand Up @@ -838,7 +842,7 @@ func (l *Log) stateLoop(closing <-chan struct{}, state State, stateChanged chan
l.lock()
defer l.unlock()

l.Logger.Printf("log state change: %s => %s (term=%d)", l.state, state, l.term)
l.tracef("log state change: %s => %s (term=%d)", l.state, state, l.term)
l.state = state
l.transitioning = make(chan struct{}, 0)
transitioning = l.transitioning
Expand Down Expand Up @@ -901,7 +905,7 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {

return Candidate
case hb := <-l.heartbeats:
l.tracef("followerLoop: heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex)
l.tracef("followerLoop: recv heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex)

// Update term, commit index & leader.
l.lock()
Expand All @@ -917,7 +921,7 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {

func (l *Log) readFromLeader(wg *sync.WaitGroup) {
defer wg.Done()
l.tracef("readFromLeader:")
l.tracef("readFromLeader")

for {
select {
Expand All @@ -944,7 +948,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup) {
l.tracef("readFromLeader: read from: %s, id=%d, term=%d, index=%d", u.String(), id, term, lastLogIndex)
r, err := l.Transport.ReadFrom(u, id, term, lastLogIndex)
if err != nil {
l.Logger.Printf("connect stream: %s", err)
l.printf("readFromLeader: connect stream: %s", err)
time.Sleep(500 * time.Millisecond)
continue
}
Expand All @@ -958,7 +962,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup) {

// truncateTo removes all uncommitted entries up to index.
func (l *Log) truncateTo(index uint64) {
assert(index >= l.commitIndex, "cannot truncate to before the commit index: index=%d, commit=%d", index, l.commitIndex)
assert(index >= l.commitIndex, "cannot truncate to before the commit index: id=%d index=%d, commit=%d", l.id, index, l.commitIndex)

// Ignore if there are no entries.
// Ignore if all entries are before the index.
Expand All @@ -981,7 +985,8 @@ func (l *Log) truncateTo(index uint64) {
l.entries = l.entries[:index-emin+1]
l.lastLogIndex = index

assert(l.entries[len(l.entries)-1].Index == index, "last entry in truncation not index: emax=%d, index=%d", l.entries[len(l.entries)-1].Index, index)
assert(l.entries[len(l.entries)-1].Index == index, "last entry in truncation not index: id=%d emax=%d, index=%d",
l.id, l.entries[len(l.entries)-1].Index, index)
}

// candidateLoop requests vote from other nodes in an attempt to become leader.
Expand Down Expand Up @@ -1021,7 +1026,10 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
l.lock()
l.mustSetTermIfHigher(hb.term)
if hb.term >= l.term {
l.tracef("candidateLoop: new leader: old=%d new=%d", l.leaderID, hb.leaderID)
l.leaderID = hb.leaderID
l.unlock()
return Follower
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this demotion occur from the message on the l.terms channel received below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benbjohnson -- yes, that looks correct. There should be no need to return follower here, since the case statement below should be triggered by the signal sent by mustSetTermIfHigher.

}
l.unlock()
case <-l.terms:
Expand Down Expand Up @@ -1056,7 +1064,7 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) {
}
go func(n *ConfigNode) {
peerTerm, err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm)
l.Logger.Printf("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err)
l.tracef("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err)

// If an error occured then update term.
if err != nil {
Expand Down Expand Up @@ -1169,7 +1177,7 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
return
}

l.tracef("send heartbeat: start: n=%d", len(config.Nodes))
l.tracef("leaderLoop: send heartbeat: start: n=%d", len(config.Nodes))

// Send heartbeats to all peers.
peerIndices := make(chan uint64, len(config.Nodes))
Expand All @@ -1183,13 +1191,13 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
c := atomic.AddInt64(&n.HeartbeatErrorCount, 1)
// log heartbeat error once every 15 seconds to avoid flooding the logs
if time.Now().Unix()-atomic.LoadInt64(&n.LastHeartbeatError) > heartbeartErrorLogThreshold {
l.Logger.Printf("send heartbeat: error: cnt=%d %s", c, err)
l.printf("leaderLoop: send heartbeat: error: cnt=%d %s", c, err)
atomic.StoreInt64(&n.LastHeartbeatError, time.Now().Unix())
}
return
}
if atomic.LoadInt64(&n.LastHeartbeatError) != 0 {
l.Logger.Printf("send heartbeat: success url=%s", n.URL.String())
l.printf("leaderLoop: send heartbeat: success url=%s", n.URL.String())
atomic.StoreInt64(&n.LastHeartbeatError, 0)
atomic.StoreInt64(&n.HeartbeatErrorCount, 0)
}
Expand All @@ -1204,10 +1212,10 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
for {
select {
case <-l.transitioning:
l.tracef("send heartbeat: transitioning")
l.tracef("leaderLoop: send heartbeat: transitioning")
return
case peerIndex := <-peerIndices:
l.tracef("send heartbeat: index: idx=%d, idxs=%+v", peerIndex, indexes)
l.tracef("leaderLoop: send heartbeat: index: idx=%d, idxs=%+v", peerIndex, indexes)
indexes = append(indexes, peerIndex) // collect responses
case ch := <-after:
// Once we have enough indices then return the lowest index
Expand All @@ -1217,9 +1225,9 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup
// Return highest index reported by quorum.
sort.Sort(sort.Reverse(uint64Slice(indexes)))
committed <- indexes[quorumN-1]
l.tracef("send heartbeat: commit: idx=%d, idxs=%+v", commitIndex, indexes)
l.tracef("leaderLoop: send heartbeat: commit: idx=%d, idxs=%+v", commitIndex, indexes)
} else {
l.tracef("send heartbeat: no quorum: idxs=%+v", indexes)
l.tracef("leaderLoop: send heartbeat: no quorum: idxs=%+v", indexes)
close(committed)
}
close(ch)
Expand Down Expand Up @@ -1366,7 +1374,7 @@ func (l *Log) appendToWriters(buf []byte) {

// If an error occurs then remove the writer and close it.
if _, err := w.Write(buf); err != nil {
l.Logger.Printf("append to writers error: %s", err)
l.printf("append to writers error: %s", err)
l.removeWriter(w)
i--
continue
Expand Down Expand Up @@ -1607,13 +1615,13 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64

// Check if log is closed.
if !l.opened() || l.state == Stopped {
l.Logger.Printf("recv heartbeat: closed")
l.tracef("recv heartbeat: closed")
return 0, ErrClosed
}

// Ignore if the incoming term is less than the log's term.
if term < l.term {
l.Logger.Printf("recv heartbeat: stale term, ignore: %d < %d", term, l.term)
l.tracef("recv heartbeat: stale term, ignore: %d < %d", term, l.term)
return l.lastLogIndex, ErrStaleTerm
}

Expand Down Expand Up @@ -1644,7 +1652,8 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (
}

defer func() {
l.Logger.Printf("recv req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)", term, candidateID, lastLogIndex, lastLogTerm, err)
l.tracef("recv req vote: (term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)",
term, candidateID, lastLogIndex, lastLogTerm, err)
}()

// Deny vote if:
Expand Down Expand Up @@ -1679,7 +1688,7 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
// Validate and initialize the writer.
writer, err := l.initWriter(w, id, term, index)
if err != nil {
l.Logger.Printf("unable to init writer: %s", err)
l.printf("unable to init writer: %s", err)
return err
}

Expand All @@ -1689,7 +1698,7 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
l.lock()
l.removeWriter(writer)
l.unlock()
l.Logger.Printf("unable to write entries: %s", err)
l.printf("unable to write entries: %s", err)
return err
}

Expand Down Expand Up @@ -1812,7 +1821,7 @@ func (l *Log) removeWriter(writer *logWriter) {
l.writers[len(l.writers)-1] = nil
l.writers = l.writers[:len(l.writers)-1]
_ = w.Close()
l.Logger.Printf("writer removed: %#v", w)
l.printf("writer removed: id=%d idx=%d", w.id, w.snapshotIndex)
break
}
}
Expand All @@ -1833,7 +1842,7 @@ func (l *Log) ReadFrom(r io.ReadCloser) error {
return nil
}

l.Logger.Printf("reading from stream")
l.tracef("reading from stream")

// Continually decode entries.
dec := NewLogEntryDecoder(r)
Expand All @@ -1852,13 +1861,13 @@ func (l *Log) ReadFrom(r io.ReadCloser) error {
case logEntryConfig:
l.tracef("ReadFrom: config")
if err := l.applyConfigLogEntry(e); err != nil {
l.Logger.Printf("error reading config from stream: %s", err)
l.printf("error reading config from stream: %s", err)
return fmt.Errorf("apply config log entry: %s", err)
}

case logEntrySnapshot:
if err := l.applySnapshotLogEntry(e, r); err != nil {
l.Logger.Printf("error snapshotting from stream: %s", err)
l.printf("error snapshotting from stream: %s", err)
return fmt.Errorf("apply snapshot log entry: %s", err)
}

Expand All @@ -1873,7 +1882,7 @@ func (l *Log) ReadFrom(r io.ReadCloser) error {

return nil
}(); err != nil {
l.Logger.Printf("error appending from stream: %s", err)
l.printf("error appending from stream: %s", err)
return err
}
}
Expand Down Expand Up @@ -1910,8 +1919,8 @@ func (l *Log) applySnapshotLogEntry(e *LogEntry, r io.Reader) error {

// Log snapshotting time.
start := time.Now()
l.Logger.Printf("applying snapshot: begin")
defer func() { l.Logger.Printf("applying snapshot: done (%s)", time.Since(start)) }()
l.printf("applying snapshot: begin")
defer func() { l.printf("applying snapshot: done (%s)", time.Since(start)) }()

// Let the FSM rebuild its state from the data in r.
if _, err := l.FSM.ReadFrom(r); err != nil {
Expand Down