From 057309fc8e103f87236388c9b9e05cc9ad71a716 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 26 Mar 2015 20:32:39 -0600 Subject: [PATCH 1/4] Simplify raft snapshotting, entry apply. --- messaging/broker.go | 58 +++++---- messaging/broker_test.go | 28 ++-- raft/internal_test.go | 19 +-- raft/log.go | 272 ++++++++++++++++++--------------------- raft/log_test.go | 33 ++--- 5 files changed, 199 insertions(+), 211 deletions(-) diff --git a/messaging/broker.go b/messaging/broker.go index d2ca0d4a02a..f647d27df7d 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -110,10 +110,10 @@ func (b *Broker) Topic(id uint64) *Topic { // Index returns the highest index seen by the broker across all topics. // Returns 0 if the broker is closed. -func (b *Broker) Index() (uint64, error) { +func (b *Broker) Index() uint64 { b.mu.RLock() defer b.mu.RUnlock() - return b.index, nil + return b.index } // opened returns true if the broker is in an open and running state. @@ -256,8 +256,8 @@ func (b *Broker) setMaxIndex(index uint64) error { return nil } -// Snapshot streams the current state of the broker and returns the index. -func (b *Broker) Snapshot(w io.Writer) (uint64, error) { +// WriteTo writes a snapshot of the broker to w. +func (b *Broker) WriteTo(w io.Writer) (int64, error) { // TODO: Prevent truncation during snapshot. // Calculate header under lock. @@ -291,8 +291,7 @@ func (b *Broker) Snapshot(w io.Writer) (uint64, error) { } } - // Return the snapshot and its last applied index. - return hdr.Index, nil + return 0, nil } // createSnapshotHeader creates a snapshot header. @@ -352,32 +351,32 @@ func copyFileN(w io.Writer, path string, n int64) (int64, error) { return io.CopyN(w, f, n) } -// Restore reads the broker state. -func (b *Broker) Restore(r io.Reader) error { +// ReadFrom reads a broker snapshot from r. +func (b *Broker) ReadFrom(r io.Reader) (int64, error) { b.mu.Lock() defer b.mu.Unlock() // Remove and recreate broker path. if err := b.reset(); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("reset: %s", err) + return 0, fmt.Errorf("reset: %s", err) } else if err = os.MkdirAll(b.path, 0777); err != nil { - return fmt.Errorf("mkdir: %s", err) + return 0, fmt.Errorf("mkdir: %s", err) } // Read header frame. var sz uint32 if err := binary.Read(r, binary.BigEndian, &sz); err != nil { - return fmt.Errorf("read header size: %s", err) + return 0, fmt.Errorf("read header size: %s", err) } buf := make([]byte, sz) if _, err := io.ReadFull(r, buf); err != nil { - return fmt.Errorf("read header: %s", err) + return 0, fmt.Errorf("read header: %s", err) } // Decode header. sh := &snapshotHeader{} if err := json.Unmarshal(buf, &sh); err != nil { - return fmt.Errorf("decode header: %s", err) + return 0, fmt.Errorf("decode header: %s", err) } // Close any topics which might be open and clear them out. @@ -389,7 +388,7 @@ func (b *Broker) Restore(r io.Reader) error { // Create topic directory. if err := os.MkdirAll(t.Path(), 0777); err != nil { - return fmt.Errorf("make topic dir: %s", err) + return 0, fmt.Errorf("make topic dir: %s", err) } // Copy data from snapshot into segment files. @@ -411,24 +410,24 @@ func (b *Broker) Restore(r io.Reader) error { return nil }(); err != nil { - return err + return 0, err } } // Open topic. if err := t.Open(); err != nil { - return fmt.Errorf("open topic: %s", err) + return 0, fmt.Errorf("open topic: %s", err) } b.topics[t.id] = t } // Set the highest seen index. if err := b.setMaxIndex(sh.Index); err != nil { - return fmt.Errorf("set max index: %s", err) + return 0, fmt.Errorf("set max index: %s", err) } b.index = sh.Index - return nil + return 0, nil } // reset removes all files in the broker directory besides the raft directory. @@ -570,20 +569,21 @@ type snapshotTopicSegment struct { // It will panic for any errors that occur during Apply. type RaftFSM struct { Broker interface { + io.WriterTo + io.ReaderFrom + Apply(m *Message) error - Index() (uint64, error) + Index() uint64 SetMaxIndex(uint64) error - Snapshot(w io.Writer) (uint64, error) - Restore(r io.Reader) error } } -func (fsm *RaftFSM) Index() (uint64, error) { return fsm.Broker.Index() } -func (fsm *RaftFSM) Snapshot(w io.Writer) (uint64, error) { return fsm.Broker.Snapshot(w) } -func (fsm *RaftFSM) Restore(r io.Reader) error { return fsm.Broker.Restore(r) } +func (fsm *RaftFSM) Index() uint64 { return fsm.Broker.Index() } +func (fsm *RaftFSM) WriteTo(w io.Writer) (n int64, err error) { return fsm.Broker.WriteTo(w) } +func (fsm *RaftFSM) ReadFrom(r io.Reader) (n int64, err error) { return fsm.Broker.ReadFrom(r) } -// MustApply applies a raft command to the broker. Panic on error. -func (fsm *RaftFSM) MustApply(e *raft.LogEntry) { +// Apply applies a raft command to the broker. +func (fsm *RaftFSM) Apply(e *raft.LogEntry) error { switch e.Type { case raft.LogEntryCommand: // Decode message. @@ -595,15 +595,17 @@ func (fsm *RaftFSM) MustApply(e *raft.LogEntry) { // Apply message. if err := fsm.Broker.Apply(m); err != nil { - panic(err.Error()) + return fmt.Errorf("broker apply: %s", err) } default: // Move internal index forward if it's an internal raft comand. if err := fsm.Broker.SetMaxIndex(e.Index); err != nil { - panic(fmt.Sprintf("set max index: idx=%d, err=%s", e.Index, err)) + return fmt.Errorf("set max index: idx=%d, err=%s", e.Index, err) } } + + return nil } // DefaultMaxSegmentSize is the largest a segment can get before starting a new segment. diff --git a/messaging/broker_test.go b/messaging/broker_test.go index d0ae7765e9c..33d07e30064 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -103,7 +103,7 @@ func TestBroker_Apply(t *testing.T) { } // Verify broker high water mark. - if index, _ := b.Index(); index != 4 { + if index := b.Index(); index != 4 { t.Fatalf("unexpected broker index: %d", index) } } @@ -158,7 +158,7 @@ func TestBroker_Reopen(t *testing.T) { } // Verify broker high water mark. - if index, _ := b.Index(); index != 4 { + if index := b.Index(); index != 4 { t.Fatalf("unexpected broker index: %d", index) } @@ -188,16 +188,14 @@ func TestBroker_Snapshot(t *testing.T) { // Snapshot the first broker. var buf bytes.Buffer - if index, err := b0.Snapshot(&buf); err != nil { + if _, err := b0.WriteTo(&buf); err != nil { t.Fatalf("snapshot error: %s", err) - } else if index != 4 { - t.Fatalf("unexpected snapshot index: %d", index) } // Restore to the second broker. b1 := OpenBroker() defer b1.Close() - if err := b1.Restore(&buf); err != nil { + if _, err := b1.ReadFrom(&buf); err != nil { t.Fatalf("restore error: %s", err) } @@ -224,7 +222,7 @@ func TestBroker_Snapshot(t *testing.T) { } // Verify broker high water mark. - if index, _ := b1.Index(); index != 4 { + if index := b1.Index(); index != 4 { t.Fatalf("unexpected broker index: %d", index) } } @@ -268,8 +266,9 @@ func TestRaftFSM_MustApply_Message(t *testing.T) { // Encode message and apply it as a log entry. m := messaging.Message{TopicID: 20} data, _ := m.MarshalBinary() - fsm.MustApply(&raft.LogEntry{Index: 2, Data: data}) - if !called { + if err := fsm.Apply(&raft.LogEntry{Index: 2, Data: data}); err != nil { + t.Fatal(err) + } else if !called { t.Fatal("Apply() not called") } } @@ -289,8 +288,9 @@ func TestRaftFSM_MustApply_Internal(t *testing.T) { } // Encode message and apply it as a log entry. - fsm.MustApply(&raft.LogEntry{Type: raft.LogEntryAddPeer, Index: 2}) - if !called { + if err := fsm.Apply(&raft.LogEntry{Type: raft.LogEntryAddPeer, Index: 2}); err != nil { + t.Fatal(err) + } else if !called { t.Fatal("Apply() not called") } } @@ -318,9 +318,9 @@ type RaftFSMBroker struct { func (b *RaftFSMBroker) Apply(m *messaging.Message) error { return b.ApplyFunc(m) } func (b *RaftFSMBroker) SetMaxIndex(index uint64) error { return b.SetMaxIndexFunc(index) } -func (b *RaftFSMBroker) Index() (uint64, error) { return 0, nil } -func (b *RaftFSMBroker) Snapshot(w io.Writer) (uint64, error) { return 0, nil } -func (b *RaftFSMBroker) Restore(r io.Reader) error { return nil } +func (b *RaftFSMBroker) Index() uint64 { return 0 } +func (b *RaftFSMBroker) WriteTo(w io.Writer) (n int64, err error) { return 0, nil } +func (b *RaftFSMBroker) ReadFrom(r io.Reader) (n int64, err error) { return 0, nil } // Ensure a list of topics can be read from a directory. func TestReadTopics(t *testing.T) { diff --git a/raft/internal_test.go b/raft/internal_test.go index a84cf0bd641..fee146988a0 100644 --- a/raft/internal_test.go +++ b/raft/internal_test.go @@ -48,19 +48,22 @@ type IndexFSM struct { } // MustApply updates the index. -func (fsm *IndexFSM) MustApply(entry *LogEntry) { fsm.index = entry.Index } +func (fsm *IndexFSM) Apply(entry *LogEntry) error { + fsm.index = entry.Index + return nil +} // Index returns the highest applied index. -func (fsm *IndexFSM) Index() (uint64, error) { return fsm.index, nil } +func (fsm *IndexFSM) Index() uint64 { return fsm.index } -// Snapshot writes the FSM's index as the snapshot. -func (fsm *IndexFSM) Snapshot(w io.Writer) (uint64, error) { - return fsm.index, binary.Write(w, binary.BigEndian, fsm.index) +// WriteTo writes a snapshot of the FSM to w. +func (fsm *IndexFSM) WriteTo(w io.Writer) (n int64, err error) { + return 0, binary.Write(w, binary.BigEndian, fsm.index) } -// Restore reads the snapshot from the reader. -func (fsm *IndexFSM) Restore(r io.Reader) error { - return binary.Read(r, binary.BigEndian, &fsm.index) +// ReadFrom reads an FSM snapshot from r. +func (fsm *IndexFSM) ReadFrom(r io.Reader) (n int64, err error) { + return 0, binary.Read(r, binary.BigEndian, &fsm.index) } // tempfile returns the path to a non-existent file in the temp directory. diff --git a/raft/log.go b/raft/log.go index f488ea3eeac..7e596afad13 100644 --- a/raft/log.go +++ b/raft/log.go @@ -25,19 +25,15 @@ import ( // FSM represents the state machine that the log is applied to. // The FSM must maintain the highest index that it has seen. type FSM interface { + io.WriterTo + io.ReaderFrom + // Executes a log entry against the state machine. // Non-repeatable errors such as system and disk errors must panic. - MustApply(*LogEntry) - - // Returns the highest index saved to the state machine. - Index() (uint64, error) - - // Writes a snapshot of the entire state machine to a writer. - // Returns the index at the point in time of the snapshot. - Snapshot(w io.Writer) (index uint64, err error) + Apply(*LogEntry) error - // Reads a snapshot of the entire state machine. - Restore(r io.Reader) error + // Returns the applied index saved to the state machine. + Index() uint64 } const logEntryHeaderSize = 8 + 8 + 8 // sz+index+term @@ -91,8 +87,7 @@ type Log struct { votedFor uint64 // candidate voted for in current election term lastContact time.Time // last contact from the leader - commitIndex uint64 // highest entry to be committed - appliedIndex uint64 // highest entry to applied to state machine + commitIndex uint64 // highest entry to be committed reader io.ReadCloser // incoming stream from leader writers []*logWriter // outgoing streams to followers @@ -231,13 +226,6 @@ func (l *Log) CommitIndex() uint64 { return l.commitIndex } -// AppliedIndex returns the highest applied index. -func (l *Log) AppliedIndex() uint64 { - l.mu.Lock() - defer l.mu.Unlock() - return l.appliedIndex -} - // Term returns the current term. func (l *Log) Term() uint64 { l.mu.Lock() @@ -299,13 +287,9 @@ func (l *Log) Open(path string) error { l.config = c // Determine last applied index from FSM. - index, err := l.FSM.Index() - if err != nil { - return err - } + index := l.FSM.Index() l.tracef("Open: fsm: index=%d", index) l.lastLogIndex = index - l.appliedIndex = index l.commitIndex = index // Start goroutine to apply logs. @@ -1095,18 +1079,18 @@ func (l *Log) internalApply(typ LogEntryType, command []byte) (index uint64, err } // Wait blocks until a given index is applied. -func (l *Log) Wait(index uint64) error { +func (l *Log) Wait(idx uint64) error { // TODO(benbjohnson): Check for leadership change (?). // TODO(benbjohnson): Add timeout. for { l.mu.Lock() - state, appliedIndex := l.state, l.appliedIndex + state, index := l.state, l.FSM.Index() l.mu.Unlock() if state == Stopped { return ErrClosed - } else if appliedIndex >= index { + } else if index >= idx { return nil } time.Sleep(WaitInterval) @@ -1192,109 +1176,100 @@ func (l *Log) applier(closing <-chan struct{}) { //l.tracef("applier") - // Apply all entries committed since the previous apply. - err := func() error { - l.mu.Lock() - defer l.mu.Unlock() - - // Verify, under lock, that we're not closing. - select { - case <-closing: - return nil - default: + // Keep applying the next entry until there are no more committed + // entries that have not been applied to the state machine. + for { + if err := l.applyNextUnappliedEntry(closing); err == errDone { + break + } else if err != nil { + panic(err.Error()) } + } - // Ignore if there are no pending entries. - // Ignore if all entries are applied. - if len(l.entries) == 0 { - //l.tracef("applier: no entries") - return nil - } else if l.appliedIndex == l.commitIndex { - //l.tracef("applier: up to date") - return nil - } + // Trim entries. + l.mu.Lock() + l.trim() + l.mu.Unlock() - // Determine the available range of indices on the log. - entmin, entmax := l.entries[0].Index, l.entries[len(l.entries)-1].Index - assert(entmin <= entmax, "apply: log out of order: min=%d, max=%d", entmin, entmax) - assert(uint64(len(l.entries)) == (entmax-entmin+1), "apply: missing entries: min=%d, max=%d, len=%d", entmin, entmax, len(l.entries)) - - // Determine the range of indices that should be processed. - // This should be the entry after the last applied index through to - // the committed index. - nextUnappliedIndex, commitIndex := l.appliedIndex+1, l.commitIndex - l.tracef("applier: entries: available=%d-%d, [next,commit]=%d-%d", entmin, entmax, nextUnappliedIndex, commitIndex) - assert(nextUnappliedIndex <= commitIndex, "next unapplied index after commit index: next=%d, commit=%d", nextUnappliedIndex, commitIndex) - - // Determine the lowest index to start from. - // This should be the next entry after the last applied entry. - // Ignore if we don't have any entries after the last applied yet. - assert(entmin <= nextUnappliedIndex, "apply: missing entries: min=%d, next=%d", entmin, nextUnappliedIndex) - if nextUnappliedIndex > entmax { - return nil - } - imin := nextUnappliedIndex - - // Determine the highest index to go to. - // This should be the committed index. - // If we haven't yet received the committed index then go to the last available. - var imax uint64 - if commitIndex <= entmax { - imax = commitIndex - } else { - imax = entmax - } + // Signal clock that apply is done. + close(confirm) + } +} - // Determine entries to apply. - l.tracef("applier: entries: available=%d-%d, applying=%d-%d", entmin, entmax, imin, imax) - entries := l.entries[imin-entmin : imax-entmin+1] +// applyNextUnappliedEntry applies the next committed entry that has not yet been applied. +func (l *Log) applyNextUnappliedEntry(closing <-chan struct{}) error { + l.mu.Lock() + defer l.mu.Unlock() - // Determine low water mark for entries to cut off. - for _, w := range l.writers { - if w.snapshotIndex > 0 && w.snapshotIndex < imax { - imax = w.snapshotIndex - } - } - l.entries = l.entries[imax-entmin:] - - // Iterate over each entry and apply it. - for _, e := range entries { - // l.tracef("applier: entry: idx=%d", e.Index) - - switch e.Type { - case LogEntryCommand, LogEntryNop: - case LogEntryInitialize: - l.mustApplyInitialize(e) - case LogEntryAddPeer: - l.mustApplyAddPeer(e) - case LogEntryRemovePeer: - l.mustApplyRemovePeer(e) - default: - panic("unsupported command type: " + strconv.Itoa(int(e.Type))) - } + // Verify, under lock, that we're not closing. + select { + case <-closing: + return errDone + default: + } - // Apply to FSM. - if e.Index > 0 { - l.FSM.MustApply(e) - } + // Ignore if there are no entries in the log. + if len(l.entries) == 0 { + return errDone + } - // Increment applied index. - l.appliedIndex++ - } + // Determine next index to apply. + // Ignore if next index is after the commit index. + // Ignore if the entry is not streamed to the log yet. + index := l.FSM.Index() + 1 + if index > l.commitIndex { + return errDone + } else if index > l.entries[len(l.entries)-1].Index { + return errDone + } - return nil - }() + // Retrieve next entry. + e := l.entries[index-l.entries[0].Index] + assert(e.Index == index, "apply: index mismatch: %d != %d", e.Index, index) - // If error occurred then log it. - // The log will retry after a given timeout. - if err != nil { - l.Logger.Printf("apply error: %s", err) - // TODO(benbjohnson): Longer timeout before retry? + // Special handling for internal log entries. + switch e.Type { + case LogEntryCommand, LogEntryNop: + case LogEntryInitialize: + l.mustApplyInitialize(e) + case LogEntryAddPeer: + l.mustApplyAddPeer(e) + case LogEntryRemovePeer: + l.mustApplyRemovePeer(e) + default: + return fmt.Errorf("unsupported command type: %d", e.Type) + } + + // Apply to FSM. + if err := l.FSM.Apply(e); err != nil { + return fmt.Errorf("apply: %s", err) + } + + return nil +} + +// trim truncates the log based on the applied index and pending writers. +func (l *Log) trim() { + if len(l.entries) == 0 { + return + } + + // Determine lowest index to trim to. + index := l.FSM.Index() + for _, w := range l.writers { + if w.snapshotIndex > 0 && w.snapshotIndex < index { + index = w.snapshotIndex } + } - // Signal clock that apply is done. - close(confirm) + // Ignore if the index is lower than the first entry. + // This can occur on a new snapshot. + if index < l.entries[0].Index { + return } + + // Reslice entries list. + l.entries = l.entries[index-l.entries[0].Index:] } // mustApplyInitialize a log initialization command by parsing and setting the configuration. @@ -1492,19 +1467,18 @@ func (l *Log) writeTo(writer *logWriter, id, term, index uint64) error { } // Begin streaming the snapshot. - snapshotIndex, err := l.FSM.Snapshot(w) - if err != nil { + if _, err := l.FSM.WriteTo(w); err != nil { return err } - - // Write snapshot index at the end and flush. - if err := binary.Write(w, binary.BigEndian, snapshotIndex); err != nil { - return fmt.Errorf("write snapshot index: %s", err) - } flushWriter(w) + // // Write snapshot index at the end and flush. + // if err := binary.Write(w, binary.BigEndian, snapshotIndex); err != nil { + // return fmt.Errorf("write snapshot index: %s", err) + // } + // Write entries since the snapshot occurred and begin tailing writer. - if err := l.advanceWriter(writer, snapshotIndex); err != nil { + if err := l.advanceWriter(writer); err != nil { return err } @@ -1553,7 +1527,7 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error writer := &logWriter{ Writer: w, id: id, - snapshotIndex: l.appliedIndex, + snapshotIndex: l.FSM.Index(), done: make(chan struct{}), } l.writers = append(l.writers, writer) @@ -1562,7 +1536,7 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error } // replays entries since the snapshot's index and begins tailing the log. -func (l *Log) advanceWriter(writer *logWriter, snapshotIndex uint64) error { +func (l *Log) advanceWriter(writer *logWriter) error { l.mu.Lock() defer l.mu.Unlock() @@ -1575,9 +1549,8 @@ func (l *Log) advanceWriter(writer *logWriter, snapshotIndex uint64) error { // Write pending entries. if len(l.entries) > 0 { - startIndex := l.entries[0].Index enc := NewLogEntryEncoder(writer.Writer) - for _, e := range l.entries[snapshotIndex-startIndex+1:] { + for _, e := range l.entries[writer.snapshotIndex-l.entries[0].Index+1:] { if err := enc.Encode(e); err != nil { return err } @@ -1663,25 +1636,34 @@ func (l *Log) ReadFrom(r io.ReadCloser) error { if e.Type == logEntrySnapshot { l.tracef("ReadFrom: snapshot") - if err := l.FSM.Restore(r); err != nil { + if err := func() error { + l.mu.Lock() + defer l.mu.Unlock() + + if _, err := l.FSM.ReadFrom(r); err != nil { + return err + } + + // Update the indicies & clear the entries. + index := l.FSM.Index() + l.lastLogIndex = index + l.commitIndex = index + l.entries = nil + + return nil + }(); err != nil { + l.tracef("ReadFrom: restore error: %s", err) return err } - l.tracef("ReadFrom: snapshot: restored") - // Read the snapshot index off the end of the snapshot. - var index uint64 - if err := binary.Read(r, binary.BigEndian, &index); err != nil { - return fmt.Errorf("read snapshot index: %s", err) - } - l.tracef("ReadFrom: snapshot: index=%d", index) + l.tracef("ReadFrom: snapshot: restored") - // Update the indicies & clear the entries. - l.mu.Lock() - l.lastLogIndex = index - l.commitIndex = index - l.appliedIndex = index - l.entries = nil - l.mu.Unlock() + // // Read the snapshot index off the end of the snapshot. + // var index uint64 + // if err := binary.Read(r, binary.BigEndian, &index); err != nil { + // return fmt.Errorf("read snapshot index: %s", err) + // } + // l.tracef("ReadFrom: snapshot: index=%d", index) continue } diff --git a/raft/log_test.go b/raft/log_test.go index 92145b8cdd9..dc1b20798f1 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -249,7 +249,7 @@ func TestCluster_Elect_RealTime(t *testing.T) { // Create a cluster with a real-time clock. c := NewRealTimeCluster(3, indexFSMFunc) - minIndex := c.Logs[0].AppliedIndex() + minIndex := c.Logs[0].FSM.Index() commandN := uint64(1000) - minIndex // Run a loop to continually apply commands. @@ -297,7 +297,7 @@ func TestCluster_Elect_RealTime(t *testing.T) { // Verify FSM indicies match. for i, l := range c.Logs { - fsmIndex, _ := l.FSM.(*raft.IndexFSM).Index() + fsmIndex := l.FSM.Index() if exp := commandN + minIndex; exp != fsmIndex { t.Errorf("fsm index mismatch(%d): exp=%d, got=%d", i, exp, fsmIndex) } @@ -334,7 +334,7 @@ func benchmarkClusterApply(b *testing.B, logN int) { // Verify FSM indicies match. for i, l := range c.Logs { - fsmIndex, _ := l.FSM.(*raft.IndexFSM).Index() + fsmIndex := l.FSM.Index() if index != fsmIndex { b.Errorf("fsm index mismatch(%d): exp=%d, got=%d", i, index, fsmIndex) } @@ -592,39 +592,40 @@ type FSM struct { Commands [][]byte } -// MustApply updates the max index and appends the command. -func (fsm *FSM) MustApply(entry *raft.LogEntry) { +// Apply updates the max index and appends the command. +func (fsm *FSM) Apply(entry *raft.LogEntry) error { fsm.MaxIndex = entry.Index if entry.Type == raft.LogEntryCommand { fsm.Commands = append(fsm.Commands, entry.Data) } + return nil } // Index returns the highest applied index. -func (fsm *FSM) Index() (uint64, error) { return fsm.MaxIndex, nil } +func (fsm *FSM) Index() uint64 { return fsm.MaxIndex } -// Snapshot begins writing the FSM to a writer. -func (fsm *FSM) Snapshot(w io.Writer) (uint64, error) { +// WriteTo writes a snapshot of the FSM to w. +func (fsm *FSM) WriteTo(w io.Writer) (n int64, err error) { b, _ := json.Marshal(fsm) binary.Write(w, binary.BigEndian, uint64(len(b))) - _, err := w.Write(b) - return fsm.MaxIndex, err + _, err = w.Write(b) + return 0, err } -// Restore reads the snapshot from the reader. -func (fsm *FSM) Restore(r io.Reader) error { +// ReadFrom reads an FSM snapshot from r. +func (fsm *FSM) ReadFrom(r io.Reader) (n int64, err error) { var sz uint64 if err := binary.Read(r, binary.BigEndian, &sz); err != nil { - return err + return 0, err } buf := make([]byte, sz) if _, err := io.ReadFull(r, buf); err != nil { - return err + return 0, err } if err := json.Unmarshal(buf, &fsm); err != nil { - return err + return 0, err } - return nil + return 0, nil } func fsmFunc() raft.FSM { return &FSM{} } From 3d03602fa766fc81e387e1b4564a68aceb6547df Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 29 Mar 2015 09:08:44 -0600 Subject: [PATCH 2/4] Raft documentation, read from index fix. --- raft/log.go | 503 ++++++++++++++++++++++++----------------- raft/log_test.go | 167 +++++++++++++- raft/transport_test.go | 17 ++ 3 files changed, 481 insertions(+), 206 deletions(-) diff --git a/raft/log.go b/raft/log.go index 7e596afad13..6b92ec9a0f5 100644 --- a/raft/log.go +++ b/raft/log.go @@ -67,35 +67,96 @@ const ( Leader ) -// Log represents a replicated log of commands. +// Log represents a replicated log of commands based on the Raft protocol. +// +// The log can exist in one of four states that transition based on the following rules: +// +// ┌───────────┐ +// ┌─▶│ Stopped │ +// │ └───────────┘ +// │ │ +// │ ▼ +// │ ┌───────────┐ +// ├──│ Follower │◀─┐ +// │ └───────────┘ │ +// close │ │ │ +// log │ ▼ │ +// │ ┌───────────┐ │ +// ├──│ Candidate │──┤ higher +// │ └───────────┘ │ term +// │ │ │ +// │ ▼ │ +// │ ┌───────────┐ │ +// └──│ Leader │──┘ +// └───────────┘ +// +// - Stopped moves to Follower when initialized or joined. +// - Follower moves to Candidate after election timeout. +// - Candidate moves to Leader after a quorum of votes. +// - Leader or Candidate moves to Follower if higher term seen. +// - Any state moves to Stopped if log is closed. type Log struct { mu sync.Mutex - id uint64 // log identifier - path string // data directory - config *Config // cluster configuration + // The directory where the id, term and config are written to. + path string - state State // current node state - heartbeats chan heartbeat // incoming heartbeat channel - terms chan uint64 // incoming channel of newer terms + // The log identifier. This is set when the log initializes + // or when the log joins to another cluster. + id uint64 - lastLogTerm uint64 // highest term in the log - lastLogIndex uint64 // highest index in the log + // Config stores all nodes in the cluster. + config *Config - term uint64 // current election term - leaderID uint64 // the current leader - votedFor uint64 // candidate voted for in current election term - lastContact time.Time // last contact from the leader + // The ID of the current leader. + leaderID uint64 - commitIndex uint64 // highest entry to be committed - - reader io.ReadCloser // incoming stream from leader - writers []*logWriter // outgoing streams to followers + // Current state of the log. + // The following state transitions can occur: + state State + // In-memory log entries. + // Followers replicate these entries from the Leader. + // Leader appends to the end of these entries. + // Truncated and trimmed as needed. entries []*LogEntry - wg sync.WaitGroup // pending goroutines - closing chan struct{} // close notification + // Highest term & index in the log. + // These are initialially read from the id/term files but otherwise + // should always match the index/term of the last 'entries' element. + lastLogTerm uint64 + lastLogIndex uint64 + + // Highest entry to be committed. + // An entry can be committed once a quorum of nodes have received the entry. + // Because streaming raft asyncronously replicates entries, the lastLogIndex + // may be lower than the commitIndex. The commitIndex is always higher than + // or equal to the FSM.Index(). + commitIndex uint64 + + // The current term the log is in. This increases when the log starts a + // new election term or when the log sees a higher election term. + term uint64 + + // The node this log voted for in the current term. + votedFor uint64 + + // Incoming stream from the leader. + // This is disconnected when the leader is deposed or the log changes state. + reader io.ReadCloser + + // Outgoing streams to the followers to replicate the log. + // These are closed when the leader is deposed. + writers []*logWriter // outgoing streams to followers + + // Incoming heartbeats and term changes go to these channels + // and are picked up by the current state. + heartbeats chan heartbeat + terms chan uint64 + + // Close notification and wait. + wg sync.WaitGroup + closing chan struct{} // Network address to the reach the log. url url.URL @@ -147,7 +208,11 @@ func NewLog() *Log { // Path returns the data path of the Raft log. // Returns an empty string if the log is closed. -func (l *Log) Path() string { return l.path } +func (l *Log) Path() string { + l.mu.Lock() + defer l.mu.Unlock() + return l.path +} // URL returns the URL for the log. func (l *Log) URL() url.URL { @@ -160,11 +225,7 @@ func (l *Log) URL() url.URL { func (l *Log) SetURL(u url.URL) { l.mu.Lock() defer l.mu.Unlock() - - if l.opened() { - panic("url cannot be set while log is open") - } - + assert(!l.opened(), "url cannot be set while log is open") l.url = u } @@ -246,69 +307,82 @@ func (l *Log) Config() *Config { // Open initializes the log from a path. // If the path does not exist then it is created. func (l *Log) Open(path string) error { - l.mu.Lock() - defer l.mu.Unlock() + var closing chan struct{} + var config *Config + if err := func() error { + l.mu.Lock() + defer l.mu.Unlock() - // Validate initial log state. - if l.opened() { - return ErrOpen - } + // Validate initial log state. + if l.opened() { + return ErrOpen + } - // Create directory, if not exists. - if err := os.MkdirAll(path, 0755); err != nil { - return err - } - l.path = path + // Create directory, if not exists. + if err := os.MkdirAll(path, 0755); err != nil { + return err + } + l.path = path - // Initialize log identifier. - id, err := l.readID() - if err != nil { - _ = l.close() - return err - } - l.setID(id) + // Initialize log identifier. + id, err := l.readID() + if err != nil { + return fmt.Errorf("read id: %s", err) + } + l.setID(id) - // Initialize log term. - term, err := l.readTerm() - if err != nil { - _ = l.close() - return err - } - l.term = term - l.votedFor = 0 - l.lastLogTerm = term + // Initialize log term. + term, err := l.readTerm() + if err != nil { + return fmt.Errorf("read term: %s", err) + } + l.term = term + l.votedFor = 0 + l.lastLogTerm = term - // Read config. - c, err := l.readConfig() - if err != nil { - _ = l.close() - return fmt.Errorf("read config: %s", err) - } - l.config = c + // Read config. + c, err := l.readConfig() + if err != nil { + return fmt.Errorf("read config: %s", err) + } + l.config = c - // Determine last applied index from FSM. - index := l.FSM.Index() - l.tracef("Open: fsm: index=%d", index) - l.lastLogIndex = index - l.commitIndex = index + // Determine last applied index from FSM. + index := l.FSM.Index() + l.tracef("Open: fsm: index=%d", index) + l.lastLogIndex = index + l.commitIndex = index - // Start goroutine to apply logs. - l.wg.Add(1) - l.closing = make(chan struct{}) - go l.applier(l.closing) + // Start goroutine to apply logs. + l.wg.Add(1) + l.closing = make(chan struct{}) + go l.applier(l.closing) - // If a log exists then start the state loop. - if c != nil { - l.Logger.Printf("log open: created at %s, with ID %d, term %d, last applied index of %d", - path, l.id, l.term, l.lastLogIndex) + if l.config != nil { + l.Logger.Printf("log open: created at %s, with ID %d, term %d, last applied index of %d", path, l.id, l.term, l.lastLogIndex) + } + // Retrieve variables to use while starting state loop. + config = l.config + closing = l.closing + + return nil + }(); err == ErrOpen { + return err + } else if err != nil { + _ = l.close() + return err + } + + // If a log exists then start the state loop. + if config != nil { // If the config only has one node then start it as the leader. // Otherwise start as a follower. - if len(c.Nodes) == 1 && c.Nodes[0].ID == l.id { + if len(config.Nodes) == 1 && config.Nodes[0].ID == l.ID() { l.Logger.Println("log open: promoting to leader immediately") - l.startStateLoop(l.closing, Leader) + l.startStateLoop(closing, Leader) } else { - l.startStateLoop(l.closing, Follower) + l.startStateLoop(closing, Follower) } } else { l.Logger.Printf("log pending: waiting for initialization or join") @@ -462,7 +536,7 @@ func (l *Log) writeConfig(config *Config) error { // Returns an error if log data already exists. func (l *Log) Initialize() error { var config *Config - err := func() error { + if err := func() error { l.mu.Lock() defer l.mu.Unlock() @@ -499,18 +573,17 @@ func (l *Log) Initialize() error { l.lastLogTerm = term l.leaderID = l.id - // Begin state loop as leader. - l.startStateLoop(l.closing, Leader) - - l.Logger.Printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d", - config.ClusterID, l.id, l.term) - return nil - }() - if err != nil { + }(); err != nil { return err } + // Begin state loop as leader. + l.startStateLoop(l.closing, Leader) + + l.Logger.Printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d", + config.ClusterID, l.id, l.term) + // Set initial configuration. var buf bytes.Buffer _ = NewConfigEncoder(&buf).Encode(config) @@ -621,20 +694,26 @@ func (l *Log) Join(u url.URL) error { l.tracef("Join: confirmed") // Lock once the join request is returned. - l.mu.Lock() - defer l.mu.Unlock() + if err := func() error { + l.mu.Lock() + defer l.mu.Unlock() - // Write identifier. - if err := l.writeID(id); err != nil { - return err - } - l.setID(id) + // Write identifier. + if err := l.writeID(id); err != nil { + return err + } + l.setID(id) - // Write config. - if err := l.writeConfig(config); err != nil { + // Write config. + if err := l.writeConfig(config); err != nil { + return err + } + l.config = config + + return nil + }(); err != nil { return err } - l.config = config // Begin state loop as follower. l.startStateLoop(l.closing, Follower) @@ -672,14 +751,18 @@ func (l *Log) startStateLoop(closing <-chan struct{}, state State) { func (l *Log) stateLoop(closing <-chan struct{}, state State, stateChanged chan struct{}) { defer l.wg.Done() - l.Logger.Printf("log state change: %s => %s", l.state, state) - l.state = state - close(stateChanged) - for { // Transition to new state. l.Logger.Printf("log state change: %s => %s", l.state, state) + l.mu.Lock() l.state = state + l.mu.Unlock() + + // Notify caller on first state changes. + if stateChanged != nil { + close(stateChanged) + stateChanged = nil + } // Remove previous reader, if one exists. if l.reader != nil { @@ -753,9 +836,9 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{}) default: } - // Retrieve the term, last commit index, & leader URL. + // Retrieve the term, last log index, & leader URL. l.mu.Lock() - id, commitIndex, term := l.id, l.commitIndex, l.term + id, lastLogIndex, term := l.id, l.lastLogIndex, l.term _, u := l.leader() l.mu.Unlock() @@ -767,8 +850,8 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{}) } // Connect to leader. - l.tracef("readFromLeader: read from: %s, id=%d, term=%d, index=%d", u.String(), id, term, commitIndex) - r, err := l.Transport.ReadFrom(u, id, term, commitIndex) + l.tracef("readFromLeader: read from: %s, id=%d, term=%d, index=%d", u.String(), id, term, lastLogIndex) + r, err := l.Transport.ReadFrom(u, id, term, lastLogIndex) if err != nil { l.Logger.Printf("connect stream: %s", err) } @@ -787,8 +870,9 @@ func (l *Log) truncate() { } entmin := l.entries[0].Index + l.tracef("trunc: min=%d, commit=%d", l.commitIndex, entmin) assert(l.commitIndex >= entmin, "commit index before lowest entry: commit=%d, entmin=%d", l.commitIndex, entmin) - l.entries = l.entries[:l.commitIndex-entmin] + l.entries = l.entries[:l.commitIndex-entmin-1] l.lastLogIndex = l.commitIndex } @@ -953,7 +1037,7 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { } } -// heartbeater continuoally sends heartbeats to all peers. +// heartbeater continually sends heartbeats to all peers. func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup, transitioning <-chan struct{}) { defer wg.Done() @@ -973,7 +1057,7 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup return } - l.tracef("heartbeater: start: n=%d", len(config.Nodes)) + l.tracef("send heartbeat: start: n=%d", len(config.Nodes)) // Send heartbeats to all peers. peerIndices := make(chan uint64, len(config.Nodes)) @@ -982,11 +1066,9 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup continue } go func(n *ConfigNode) { - //l.tracef("heartbeater: send: url=%s, term=%d, commit=%d, leaderID=%d", n.URL, term, commitIndex, leaderID) peerIndex, err := l.Transport.Heartbeat(n.URL, term, commitIndex, leaderID) - //l.tracef("heartbeater: recv: url=%s, peerIndex=%d, err=%s", n.URL, peerIndex, err) if err != nil { - l.Logger.Printf("heartbeater: error: %s", err) + l.Logger.Printf("send heartbeat: error: %s", err) return } peerIndices <- peerIndex @@ -1000,10 +1082,10 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup for { select { case <-transitioning: - l.tracef("heartbeater: transitioning") + l.tracef("send heartbeat: transitioning") return case peerIndex := <-peerIndices: - l.tracef("heartbeater: index: idx=%d, idxs=%+v", peerIndex, indexes) + l.tracef("send heartbeat: index: idx=%d, idxs=%+v", peerIndex, indexes) indexes = append(indexes, peerIndex) // collect responses case ch := <-after: // Once we have enough indices then return the lowest index @@ -1013,9 +1095,9 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup // Return highest index reported by quorum. sort.Sort(sort.Reverse(uint64Slice(indexes))) committed <- indexes[quorumN-1] - l.tracef("heartbeater: commit: idx=%d, idxs=%+v", commitIndex, indexes) + l.tracef("send heartbeat: commit: idx=%d, idxs=%+v", commitIndex, indexes) } else { - l.tracef("heartbeater: no quorum: idxs=%+v", indexes) + l.tracef("send heartbeat: no quorum: idxs=%+v", indexes) close(committed) } close(ch) @@ -1050,16 +1132,13 @@ func (l *Log) internalApply(typ LogEntryType, command []byte) (index uint64, err l.mu.Lock() defer l.mu.Unlock() - // Do not apply if this node is closed. // Do not apply if this node is not the leader. - if l.state == Stopped { - return 0, ErrClosed - } else if l.state != Leader { + if l.state != Leader { return 0, ErrNotLeader } // Create log entry. - e := LogEntry{ + e := &LogEntry{ Type: typ, Index: l.lastLogIndex + 1, Term: l.term, @@ -1068,7 +1147,9 @@ func (l *Log) internalApply(typ LogEntryType, command []byte) (index uint64, err index = e.Index // Append to the log. - l.append(&e) + if err := l.append(e); err != nil { + return 0, fmt.Errorf("append: %s", err) + } // If there is no config or only one node then move commit index forward. if l.config == nil || len(l.config.Nodes) <= 1 { @@ -1129,9 +1210,16 @@ func (l *Log) waitUncommitted(index uint64) error { } // append adds a log entry to the list of entries. -func (l *Log) append(e *LogEntry) { - //l.tracef("append: idx=%d, prev=%d", e.Index, l.lastLogIndex) - assert(e.Index == l.lastLogIndex+1, "non-contiguous log index(%d): idx=%d, prev=%d", l.id, e.Index, l.lastLogIndex) +func (l *Log) append(e *LogEntry) error { + // Exit if log is not in a running state. + // Ignore replayed entries. + if l.state == Stopped { + return ErrClosed + } else if e.Index <= l.lastLogIndex { + return nil + } + + assert(e.Index == l.lastLogIndex+1, "log entry skipped(%d): idx=%d, prev=%d", l.id, e.Index, l.lastLogIndex) // Encode entry to a byte slice. buf := make([]byte, logEntryHeaderSize+len(e.Data)) @@ -1144,6 +1232,13 @@ func (l *Log) append(e *LogEntry) { l.lastLogTerm = e.Term // Write to tailing writers. + l.appendToWriters(buf) + + return nil +} + +// appendToWriters writes a byte slice to all attached writers. +func (l *Log) appendToWriters(buf []byte) { for i := 0; i < len(l.writers); i++ { w := l.writers[i] @@ -1380,13 +1475,13 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64 // Check if log is closed. if !l.opened() || l.state == Stopped { - l.tracef("Heartbeat: closed") + l.tracef("recv heartbeat: closed") return 0, ErrClosed } // Ignore if the incoming term is less than the log's term. if term < l.term { - l.tracef("HB: stale term, ignore: %d < %d", term, l.term) + l.tracef("recv heartbeat: stale term, ignore: %d < %d", term, l.term) return l.lastLogIndex, ErrStaleTerm } @@ -1396,7 +1491,7 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64 default: } - l.tracef("HB: (term=%d, commit=%d, leaderID: %d) (index=%d, term=%d)", term, commitIndex, leaderID, l.lastLogIndex, l.term) + l.tracef("recv heartbeat: (term=%d, commit=%d, leaderID: %d) (index=%d, term=%d)", term, commitIndex, leaderID, l.lastLogIndex, l.term) return l.lastLogIndex, nil } @@ -1457,34 +1552,6 @@ func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error { return nil } -func (l *Log) writeTo(writer *logWriter, id, term, index uint64) error { - // Extract the underlying writer. - w := writer.Writer - - // Write snapshot marker byte. - if _, err := w.Write([]byte{logEntrySnapshot}); err != nil { - return err - } - - // Begin streaming the snapshot. - if _, err := l.FSM.WriteTo(w); err != nil { - return err - } - flushWriter(w) - - // // Write snapshot index at the end and flush. - // if err := binary.Write(w, binary.BigEndian, snapshotIndex); err != nil { - // return fmt.Errorf("write snapshot index: %s", err) - // } - - // Write entries since the snapshot occurred and begin tailing writer. - if err := l.advanceWriter(writer); err != nil { - return err - } - - return nil -} - // validates writer and adds it to the list of writers. func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error) { l.mu.Lock() @@ -1498,7 +1565,7 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error // Do not begin streaming if: // 1. Node is not the leader. // 2. Term is after current term. - // 3. Index is after the commit index. + // 3. Index is after the last entry's index. if l.state != Leader { return nil, ErrNotLeader } else if term > l.term { @@ -1508,11 +1575,9 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error } return nil, ErrNotLeader } else if index > l.lastLogIndex { - return nil, ErrUncommittedIndex + return nil, fmt.Errorf("entry not found: follower=%d, leader=%d", index, l.lastLogIndex) } - // OPTIMIZE(benbjohnson): Create buffered output to prevent blocking. - // Write configuration. var buf bytes.Buffer err := NewConfigEncoder(&buf).Encode(l.config) @@ -1535,6 +1600,34 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error return writer, nil } +func (l *Log) writeTo(writer *logWriter, id, term, index uint64) error { + // Extract the underlying writer. + w := writer.Writer + + // Write snapshot marker byte. + if _, err := w.Write([]byte{logEntrySnapshot}); err != nil { + return err + } + + // Begin streaming the snapshot. + if _, err := l.FSM.WriteTo(w); err != nil { + return err + } + flushWriter(w) + + // // Write snapshot index at the end and flush. + // if err := binary.Write(w, binary.BigEndian, snapshotIndex); err != nil { + // return fmt.Errorf("write snapshot index: %s", err) + // } + + // Write entries since the snapshot occurred and begin tailing writer. + if err := l.advanceWriter(writer); err != nil { + return err + } + + return nil +} + // replays entries since the snapshot's index and begins tailing the log. func (l *Log) advanceWriter(writer *logWriter) error { l.mu.Lock() @@ -1606,78 +1699,80 @@ func (l *Log) ReadFrom(r io.ReadCloser) error { dec := NewLogEntryDecoder(r) for { // Decode single entry. - var e LogEntry - if err := dec.Decode(&e); err == io.EOF { + e := &LogEntry{} + if err := dec.Decode(e); err == io.EOF { return nil } else if err != nil { return err } - // If this is a config entry then update the config. - if e.Type == logEntryConfig { + // Apply special config & snapshot entries immediately. + // All other entries get appended to the log. + switch e.Type { + case logEntryConfig: l.tracef("ReadFrom: config") - - config := &Config{} - if err := NewConfigDecoder(bytes.NewReader(e.Data)).Decode(config); err != nil { - return err + if err := l.applyConfigLogEntry(e); err != nil { + return fmt.Errorf("apply config log entry: %s", err) } - l.mu.Lock() - if err := l.writeConfig(config); err != nil { - l.mu.Unlock() - return err + case logEntrySnapshot: + if err := l.applySnapshotLogEntry(e, r); err != nil { + return fmt.Errorf("apply snapshot log entry: %s", err) } - l.config = config - l.mu.Unlock() - continue - } - - // If this is a snapshot then load it. - if e.Type == logEntrySnapshot { - l.tracef("ReadFrom: snapshot") + default: + // Append entry to the log. if err := func() error { l.mu.Lock() defer l.mu.Unlock() - - if _, err := l.FSM.ReadFrom(r); err != nil { - return err + if err := l.append(e); err != nil { + return fmt.Errorf("append: %s", err) } - // Update the indicies & clear the entries. - index := l.FSM.Index() - l.lastLogIndex = index - l.commitIndex = index - l.entries = nil - return nil }(); err != nil { - l.tracef("ReadFrom: restore error: %s", err) return err } + } + } +} - l.tracef("ReadFrom: snapshot: restored") +// applyConfigLogEntry updates the config for a config log entry. +func (l *Log) applyConfigLogEntry(e *LogEntry) error { + // Parse configuration from the log entry. + config := &Config{} + if err := NewConfigDecoder(bytes.NewReader(e.Data)).Decode(config); err != nil { + return fmt.Errorf("decode config: %s", err) + } - // // Read the snapshot index off the end of the snapshot. - // var index uint64 - // if err := binary.Read(r, binary.BigEndian, &index); err != nil { - // return fmt.Errorf("read snapshot index: %s", err) - // } - // l.tracef("ReadFrom: snapshot: index=%d", index) + // Write the configuration to disk. + l.mu.Lock() + defer l.mu.Unlock() + if err := l.writeConfig(config); err != nil { + return fmt.Errorf("write config: %s", err) + } + l.config = config - continue - } + return nil +} - // Append entry to the log. - l.mu.Lock() - if l.state == Stopped { - l.mu.Unlock() - return nil - } - //l.tracef("ReadFrom: entry: index=%d / prev=%d / commit=%d", e.Index, l.lastLogIndex, l.commitIndex) - l.append(&e) - l.mu.Unlock() +// applySnapshotLogEntry restores a snapshot log entry. +func (l *Log) applySnapshotLogEntry(e *LogEntry, r io.Reader) error { + l.mu.Lock() + defer l.mu.Unlock() + + // Let the FSM rebuild its state from the data in r. + if _, err := l.FSM.ReadFrom(r); err != nil { + return fmt.Errorf("fsm restore: %s", err) } + + // Update the indicies & clear the entries. + index := l.FSM.Index() + l.lastLogIndex = index + l.commitIndex = index + l.entries = nil + + return nil } // Initializes the ReadFrom() call under a lock and swaps out the readers. diff --git a/raft/log_test.go b/raft/log_test.go index dc1b20798f1..aa4818d6652 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -10,6 +10,8 @@ import ( "math/rand" "net/url" "os" + "path/filepath" + "strings" "sync" "testing" "time" @@ -26,6 +28,121 @@ func TestLog_Open_ErrOpen(t *testing.T) { } } +// Ensure that opening a log to an invalid path returns an error. +func TestLog_Open_ErrMkdir(t *testing.T) { + path := tempfile() + os.MkdirAll(path, 0) + defer os.Remove(path) + + l := NewLog(url.URL{Host: "log0"}) + l.Log.FSM = &FSM{} + defer l.Close() + if err := l.Open(filepath.Join(path, "x")); err == nil || !strings.Contains(err.Error(), `permission denied`) { + t.Fatal(err) + } +} + +// Ensure that opening a log with an inaccessible ID path returns an error. +func TestLog_Open_ErrInaccessibleID(t *testing.T) { + path := tempfile() + MustWriteFile(filepath.Join(path, "id"), []byte(`1`)) + MustChmod(filepath.Join(path, "id"), 0) + defer os.Remove(path) + + l := NewLog(url.URL{Host: "log0"}) + l.Log.FSM = &FSM{} + defer l.Close() + if err := l.Open(path); err == nil || !strings.Contains(err.Error(), `permission denied`) { + t.Fatal(err) + } +} + +// Ensure that opening a log with an invalid ID returns an error. +func TestLog_Open_ErrInvalidID(t *testing.T) { + path := tempfile() + MustWriteFile(filepath.Join(path, "id"), []byte(`X`)) + defer os.Remove(path) + + l := NewLog(url.URL{Host: "log0"}) + l.Log.FSM = &FSM{} + defer l.Close() + if err := l.Open(path); err == nil || err.Error() != `read id: strconv.ParseUint: parsing "X": invalid syntax` { + t.Fatal(err) + } +} + +// Ensure that opening a log with an inaccesible term path returns an error. +func TestLog_Open_ErrInaccessibleTerm(t *testing.T) { + path := tempfile() + MustWriteFile(filepath.Join(path, "id"), []byte(`1`)) + MustWriteFile(filepath.Join(path, "term"), []byte(`1`)) + MustChmod(filepath.Join(path, "term"), 0) + defer os.Remove(path) + + l := NewLog(url.URL{Host: "log0"}) + l.Log.FSM = &FSM{} + defer l.Close() + if err := l.Open(path); err == nil || !strings.Contains(err.Error(), `permission denied`) { + t.Fatal(err) + } +} + +// Ensure that opening a log with an invalid term returns an error. +func TestLog_Open_ErrInvalidTerm(t *testing.T) { + path := tempfile() + MustWriteFile(filepath.Join(path, "id"), []byte(`1`)) + MustWriteFile(filepath.Join(path, "term"), []byte(`X`)) + defer os.Remove(path) + + l := NewLog(url.URL{Host: "log0"}) + l.Log.FSM = &FSM{} + defer l.Close() + if err := l.Open(path); err == nil || err.Error() != `read term: strconv.ParseUint: parsing "X": invalid syntax` { + t.Fatal(err) + } +} + +// Ensure that opening an inaccessible config path returns an error. +func TestLog_Open_ErrInaccessibleConfig(t *testing.T) { + path := tempfile() + MustWriteFile(filepath.Join(path, "id"), []byte(`1`)) + MustWriteFile(filepath.Join(path, "term"), []byte(`1`)) + MustWriteFile(filepath.Join(path, "config"), []byte(`{}`)) + MustChmod(filepath.Join(path, "config"), 0) + defer os.Remove(path) + + l := NewLog(url.URL{Host: "log0"}) + l.Log.FSM = &FSM{} + defer l.Close() + if err := l.Open(path); err == nil || !strings.Contains(err.Error(), `permission denied`) { + t.Fatal(err) + } +} + +// Ensure that opening an invalid config returns an error. +func TestLog_Open_ErrInvalidConfig(t *testing.T) { + path := tempfile() + MustWriteFile(filepath.Join(path, "id"), []byte(`1`)) + MustWriteFile(filepath.Join(path, "term"), []byte(`1`)) + MustWriteFile(filepath.Join(path, "config"), []byte(`{`)) + defer os.Remove(path) + + l := NewLog(url.URL{Host: "log0"}) + l.Log.FSM = &FSM{} + defer l.Close() + if err := l.Open(path); err == nil || err.Error() != `read config: unexpected EOF` { + t.Fatal(err) + } +} + +// Ensure that initializing a closed log returns an error. +func TestLog_Initialize_ErrClosed(t *testing.T) { + l := NewLog(url.URL{Host: "log0"}) + if err := l.Initialize(); err != raft.ErrClosed { + t.Fatal(err) + } +} + // Ensure that a log can be checked for being open. func TestLog_Opened(t *testing.T) { l := NewInitializedLog(url.URL{Host: "log0"}) @@ -106,6 +223,29 @@ func TestCluster_ID_Sequential(t *testing.T) { } } +// Ensure that all the URLs for a cluster can be returned. +func TestServer_URLs(t *testing.T) { + c := NewCluster(fsmFunc) + defer c.Close() + if a := c.Logs[0].URLs(); len(a) != 3 { + t.Fatalf("unexpected url count: %d", len(a)) + } else if a[0] != (url.URL{Host: "log0"}) { + t.Fatalf("unexpected url(0): %s", a[0]) + } else if a[1] != (url.URL{Host: "log1"}) { + t.Fatalf("unexpected url(1): %s", a[1]) + } else if a[2] != (url.URL{Host: "log2"}) { + t.Fatalf("unexpected url(2): %s", a[2]) + } +} + +// Ensure that no URLs are returned for a server that has no config. +func TestServer_URLs_NoConfig(t *testing.T) { + l := NewLog(url.URL{Host: "log0"}) + if a := l.URLs(); len(a) != 0 { + t.Fatalf("unexpected url count: %d", len(a)) + } +} + // Ensure that cluster starts with one leader and multiple followers. func TestCluster_State(t *testing.T) { c := NewCluster(fsmFunc) @@ -508,11 +648,17 @@ func NewLog(u url.URL) *Log { return l } -// NewInitializedLog returns a new initialized Node. -func NewInitializedLog(u url.URL) *Log { +// OpenLog returns a new open Log. +func OpenLog(u url.URL) *Log { l := NewLog(u) l.Log.FSM = &FSM{} l.MustOpen() + return l +} + +// NewInitializedLog returns a new initialized Log. +func NewInitializedLog(u url.URL) *Log { + l := OpenLog(u) l.MustInitialize() return l } @@ -645,6 +791,23 @@ func seq() func() int64 { } } +// MustWriteFile writes data to a file. Panic on error. +func MustWriteFile(filename string, data []byte) { + if err := os.MkdirAll(filepath.Dir(filename), 0777); err != nil { + panic(err.Error()) + } + if err := ioutil.WriteFile(filename, data, 0666); err != nil { + panic(err.Error()) + } +} + +// MustChmod changes mode on a file. Panic on error. +func MustChmod(name string, mode os.FileMode) { + if err := os.Chmod(name, mode); err != nil { + panic(err.Error()) + } +} + // tempfile returns the path to a non-existent file in the temp directory. func tempfile() string { f, _ := ioutil.TempFile("", "raft-") diff --git a/raft/transport_test.go b/raft/transport_test.go index 9fdbe55b46d..3123fd7dfe8 100644 --- a/raft/transport_test.go +++ b/raft/transport_test.go @@ -70,6 +70,23 @@ func TestHTTPTransport_Join_ErrInvalidID(t *testing.T) { } } +// Ensure the response from a join contains a valid leader id. +func TestHTTPTransport_Join_ErrInvalidLeaderID(t *testing.T) { + // Start mock HTTP server. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Raft-ID", "1") + w.Header().Set("X-Raft-Leader-ID", "xxx") + })) + defer s.Close() + + // Execute join against test server. + u, _ := url.Parse(s.URL) + _, _, _, err := (&raft.HTTPTransport{}).Join(*u, url.URL{Host: "local"}) + if err == nil || err.Error() != `invalid leader id: "xxx"` { + t.Fatalf("unexpected error: %s", err) + } +} + // Ensure the response from a join contains a valid config. func TestHTTPTransport_Join_ErrInvalidConfig(t *testing.T) { // Start mock HTTP server. From 27f4a3efec5c403dc159ff96cf6df27e3789e65a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 29 Mar 2015 15:04:11 -0600 Subject: [PATCH 3/4] Fix raft transition and reader edge cases. --- raft/errors.go | 14 ++- raft/log.go | 242 ++++++++++++++++++++++++++++------------- raft/log_test.go | 3 +- raft/transport_test.go | 7 +- 4 files changed, 183 insertions(+), 83 deletions(-) diff --git a/raft/errors.go b/raft/errors.go index acf2a33dda6..4667e7cf64d 100644 --- a/raft/errors.go +++ b/raft/errors.go @@ -28,10 +28,6 @@ var ( // ErrOutOfDateLog is returned when a candidate's log is not up to date. ErrOutOfDateLog = errors.New("out of date log") - // ErrUncommittedIndex is returned when a stream is started from an - // uncommitted log index. - ErrUncommittedIndex = errors.New("uncommitted index") - // ErrAlreadyVoted is returned when a vote has already been cast for // a different candidate in the same election term. ErrAlreadyVoted = errors.New("already voted") @@ -50,4 +46,14 @@ var ( // ErrDuplicateNodeURL is returned when adding a node with an existing URL. ErrDuplicateNodeURL = errors.New("duplicate node url") + + // ErrSnapshotRequired returned when reading from an out-of-order log. + // The snapshot will be retrieved on the next reader request. + ErrSnapshotRequired = errors.New("snapshot required") +) + +// Internal marker errors. +var ( + errClosing = errors.New("closing marker") + errTransitioning = errors.New("transitioning marker") ) diff --git a/raft/log.go b/raft/log.go index 6b92ec9a0f5..60b3c280bee 100644 --- a/raft/log.go +++ b/raft/log.go @@ -40,7 +40,7 @@ const logEntryHeaderSize = 8 + 8 + 8 // sz+index+term // WaitInterval represents the amount of time between checks to the applied index. // This is used by clients wanting to wait until a given index is processed. -const WaitInterval = 1 * time.Millisecond +const WaitInterval = 100 * time.Millisecond // State represents whether the log is a follower, candidate, or leader. type State int @@ -112,8 +112,9 @@ type Log struct { leaderID uint64 // Current state of the log. - // The following state transitions can occur: - state State + // The transitioning channel is closed whenever state is changed. + state State + transitioning chan struct{} // In-memory log entries. // Followers replicate these entries from the Leader. @@ -402,8 +403,8 @@ func (l *Log) Close() error { func (l *Log) close() error { l.tracef("closing...") - // Close the reader, if open. - l.closeReader() + // Remove the reader. + _ = l.setReader(nil) // Notify goroutines of closing and wait outside of lock. if l.closing != nil { @@ -432,11 +433,39 @@ func (l *Log) close() error { return nil } -func (l *Log) closeReader() { +func (l *Log) setReaderWithLock(r io.ReadCloser) error { + l.mu.Lock() + defer l.mu.Unlock() + return l.setReader(r) +} + +func (l *Log) setReader(r io.ReadCloser) error { if l.reader != nil { _ = l.reader.Close() l.reader = nil } + + // Ignore if there is no new reader. + if r == nil { + return nil + } + + // Close reader immediately and ignore if log is closed. + if !l.opened() { + _ = r.Close() + return ErrClosed + } + + // Ignore if setting while transitioning state. + select { + case <-l.transitioning: + return errTransitioning + default: + } + + // Set new reader. + l.reader = r + return nil } func (l *Log) setID(id uint64) { @@ -494,6 +523,12 @@ func (l *Log) writeTerm(term uint64) error { return ioutil.WriteFile(l.termPath(), b, 0666) } +// setTerm sets the current term if it's higher. +func (l *Log) setTerm(term uint64) { + l.term = term + l.votedFor = 0 +} + // readConfig reads the configuration from disk. func (l *Log) readConfig() (*Config, error) { // Read config from disk. @@ -568,8 +603,7 @@ func (l *Log) Initialize() error { if err := l.writeTerm(term); err != nil { return fmt.Errorf("write term: %s", err) } - l.term = term - l.votedFor = 0 + l.setTerm(term) l.lastLogTerm = term l.leaderID = l.id @@ -753,10 +787,19 @@ func (l *Log) stateLoop(closing <-chan struct{}, state State, stateChanged chan for { // Transition to new state. - l.Logger.Printf("log state change: %s => %s", l.state, state) - l.mu.Lock() - l.state = state - l.mu.Unlock() + var transitioning chan struct{} + func() { + l.mu.Lock() + defer l.mu.Unlock() + + l.Logger.Printf("log state change: %s => %s", l.state, state) + l.state = state + l.transitioning = make(chan struct{}, 0) + transitioning = l.transitioning + + // Remove previous reader, if one exists. + _ = l.setReader(nil) + }() // Notify caller on first state changes. if stateChanged != nil { @@ -764,11 +807,6 @@ func (l *Log) stateLoop(closing <-chan struct{}, state State, stateChanged chan stateChanged = nil } - // Remove previous reader, if one exists. - if l.reader != nil { - _ = l.reader.Close() - } - // Execute the appropriate state loop. // Each loop returns the next state to transition to. switch state { @@ -791,19 +829,18 @@ func (l *Log) followerLoop(closing <-chan struct{}) State { // Ensure all follower goroutines complete before transitioning to another state. var wg sync.WaitGroup defer wg.Wait() - var transitioning = make(chan struct{}) - defer close(transitioning) + defer l.setReaderWithLock(nil) + defer close(l.transitioning) // Read log from leader in a separate goroutine. wg.Add(1) - go l.readFromLeader(&wg, transitioning) + go l.readFromLeader(&wg) for { select { case <-closing: return Stopped case ch := <-l.Clock.AfterElectionTimeout(): - l.closeReader() close(ch) return Candidate case hb := <-l.heartbeats: @@ -812,25 +849,31 @@ func (l *Log) followerLoop(closing <-chan struct{}) State { // Update term, commit index & leader. l.mu.Lock() if hb.term > l.term { - l.term = hb.term - l.votedFor = 0 + l.setTerm(hb.term) } if hb.commitIndex > l.commitIndex { l.commitIndex = hb.commitIndex } l.leaderID = hb.leaderID l.mu.Unlock() + + case term := <-l.terms: + l.mu.Lock() + if term > l.term { + l.setTerm(term) + } + l.mu.Unlock() } } } -func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{}) { +func (l *Log) readFromLeader(wg *sync.WaitGroup) { defer wg.Done() l.tracef("readFromLeader:") for { select { - case <-transitioning: + case <-l.transitioning: l.tracef("readFromLeader: exiting") return default: @@ -845,7 +888,7 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{}) // If no leader exists then wait momentarily and retry. if u.Host == "" { l.tracef("readFromLeader: no leader") - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) continue } @@ -854,6 +897,8 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{}) r, err := l.Transport.ReadFrom(u, id, term, lastLogIndex) if err != nil { l.Logger.Printf("connect stream: %s", err) + time.Sleep(500 * time.Millisecond) + continue } // Attach the stream to the log. @@ -863,17 +908,32 @@ func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{}) } } -// truncate removes all uncommitted entries. -func (l *Log) truncate() { +// truncateTo removes all uncommitted entries up to index. +func (l *Log) truncateTo(index uint64) { + assert(index >= l.commitIndex, "cannot truncate to before the commit index: index=%d, commit=%d", index, l.commitIndex) + + // Ignore if there are no entries. + // Ignore if all entries are before the index. if len(l.entries) == 0 { return + } else if l.entries[len(l.entries)-1].Index < index { + return } - entmin := l.entries[0].Index - l.tracef("trunc: min=%d, commit=%d", l.commitIndex, entmin) - assert(l.commitIndex >= entmin, "commit index before lowest entry: commit=%d, entmin=%d", l.commitIndex, entmin) - l.entries = l.entries[:l.commitIndex-entmin-1] - l.lastLogIndex = l.commitIndex + // If all entries are after the index, remove all. + if l.entries[0].Index > index { + l.entries = nil + l.lastLogIndex, l.lastLogTerm = index, l.term + return + } + + // Otherwise slice entries starting from index. + emin, emax := l.entries[0].Index, l.entries[len(l.entries)-1].Index + l.tracef("trunc: entries=[%d,%d], index=%d", emin, emax, index) + l.entries = l.entries[:index-emin+1] + l.lastLogIndex = index + + assert(l.entries[len(l.entries)-1].Index == index, "last entry in truncation not index: emax=%d, index=%d", l.entries[len(l.entries)-1].Index, index) } // candidateLoop requests vote from other nodes in an attempt to become leader. @@ -893,13 +953,12 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State { // Ensure all candidate goroutines complete before transitioning to another state. var wg sync.WaitGroup defer wg.Wait() - var transitioning = make(chan struct{}) - defer close(transitioning) + defer close(l.transitioning) // Read log from leader in a separate goroutine. wg.Add(1) elected := make(chan struct{}, 1) - go l.elect(term, elected, &wg, transitioning) + go l.elect(term, elected, &wg) for { select { @@ -907,14 +966,27 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State { return Stopped case hb := <-l.heartbeats: l.mu.Lock() - if hb.term >= l.term { - l.term = hb.term - l.votedFor = 0 + if hb.term >= term { + l.setTerm(hb.term) l.leaderID = hb.leaderID l.mu.Unlock() return Follower } l.mu.Unlock() + case newTerm := <-l.terms: + // Ignore if it's not after this current term. + if newTerm <= term { + continue + } + + // Check against the current term since that may have changed. + l.mu.Lock() + if newTerm >= l.term { + l.setTerm(newTerm) + l.mu.Unlock() + return Follower + } + l.mu.Unlock() case <-elected: return Leader case ch := <-l.Clock.AfterElectionTimeout(): @@ -924,7 +996,7 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State { } } -func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup, transitioning <-chan struct{}) { +func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) { defer wg.Done() // Ensure we are in the same term and copy properties. @@ -963,7 +1035,7 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup, tran // Wait until log transitions to another state or we receive a vote. select { - case <-transitioning: + case <-l.transitioning: return case <-votes: voteN++ @@ -979,8 +1051,7 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { // Ensure all leader goroutines complete before transitioning to another state. var wg sync.WaitGroup defer wg.Wait() - var transitioning = make(chan struct{}) - defer close(transitioning) + defer close(l.transitioning) // Retrieve leader's term. l.mu.Lock() @@ -992,7 +1063,7 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { // Send hearbeat to followers. wg.Add(1) committed := make(chan uint64, 1) - go l.heartbeater(term, committed, &wg, transitioning) + go l.heartbeater(term, committed, &wg) // Wait for close, new leader, or new heartbeat response. select { @@ -1002,8 +1073,8 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { case newTerm := <-l.terms: // step down on higher term if newTerm > term { l.mu.Lock() - l.term = newTerm - l.truncate() + l.setTerm(newTerm) + l.truncateTo(l.commitIndex) l.mu.Unlock() return Follower } @@ -1012,8 +1083,8 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { case hb := <-l.heartbeats: // step down on higher term if hb.term > term { l.mu.Lock() - l.term = hb.term - l.truncate() + l.setTerm(hb.term) + l.truncateTo(l.commitIndex) l.mu.Unlock() return Follower } @@ -1038,7 +1109,7 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { } // heartbeater continually sends heartbeats to all peers. -func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup, transitioning <-chan struct{}) { +func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup) { defer wg.Done() // Ensure term is correct and retrieve current state. @@ -1081,7 +1152,7 @@ func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup indexes[0] = localIndex for { select { - case <-transitioning: + case <-l.transitioning: l.tracef("send heartbeat: transitioning") return case peerIndex := <-peerIndices: @@ -1111,17 +1182,6 @@ type heartbeatResponse struct { peerIndex uint64 } -// check looks if the channel has any messages. -// If it does then errDone is returned, otherwise nil is returned. -func check(done chan struct{}) error { - select { - case <-done: - return errDone - default: - return nil - } -} - // Apply executes a command against the log. // This function returns once the command has been committed to the log. func (l *Log) Apply(command []byte) (uint64, error) { @@ -1167,6 +1227,7 @@ func (l *Log) Wait(idx uint64) error { for { l.mu.Lock() state, index := l.state, l.FSM.Index() + l.tracef("WAIT> I=%d, Aⁱ=%d, Cⁱ=%d, LLⁱ=%d", idx, index, l.commitIndex, l.lastLogIndex) l.mu.Unlock() if state == Stopped { @@ -1219,6 +1280,17 @@ func (l *Log) append(e *LogEntry) error { return nil } + // If the entry is not the next then it may have changed leaders. + // Attempt to trim the log to the index if it is not committed yet. + if e.Index > l.lastLogIndex+1 { + if e.Index >= l.commitIndex { + l.truncateTo(e.Index) + } else if e.Index < l.commitIndex { + l.lastLogIndex = 0 + return ErrSnapshotRequired + } + } + assert(e.Index == l.lastLogIndex+1, "log entry skipped(%d): idx=%d, prev=%d", l.id, e.Index, l.lastLogIndex) // Encode entry to a byte slice. @@ -1274,7 +1346,7 @@ func (l *Log) applier(closing <-chan struct{}) { // Keep applying the next entry until there are no more committed // entries that have not been applied to the state machine. for { - if err := l.applyNextUnappliedEntry(closing); err == errDone { + if err := l.applyNextUnappliedEntry(closing); err == errClosing { break } else if err != nil { panic(err.Error()) @@ -1299,13 +1371,13 @@ func (l *Log) applyNextUnappliedEntry(closing <-chan struct{}) error { // Verify, under lock, that we're not closing. select { case <-closing: - return errDone + return errClosing default: } // Ignore if there are no entries in the log. if len(l.entries) == 0 { - return errDone + return errClosing } // Determine next index to apply. @@ -1313,9 +1385,9 @@ func (l *Log) applyNextUnappliedEntry(closing <-chan struct{}) error { // Ignore if the entry is not streamed to the log yet. index := l.FSM.Index() + 1 if index > l.commitIndex { - return errDone + return errClosing } else if index > l.entries[len(l.entries)-1].Index { - return errDone + return errClosing } // Retrieve next entry. @@ -1489,6 +1561,7 @@ func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64 select { case l.heartbeats <- heartbeat{term: term, commitIndex: commitIndex, leaderID: leaderID}: default: + } l.tracef("recv heartbeat: (term=%d, commit=%d, leaderID: %d) (index=%d, term=%d)", term, commitIndex, leaderID, l.lastLogIndex, l.term) @@ -1523,7 +1596,16 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) ( return ErrOutOfDateLog } - // Vote for candidate. + // Notify term change. + if term > l.term { + select { + case l.terms <- term: + default: + } + } + + // Vote for candidate & increase term. + l.term = term l.votedFor = candidateID return nil @@ -1565,7 +1647,6 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error // Do not begin streaming if: // 1. Node is not the leader. // 2. Term is after current term. - // 3. Index is after the last entry's index. if l.state != Leader { return nil, ErrNotLeader } else if term > l.term { @@ -1574,8 +1655,13 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error default: } return nil, ErrNotLeader - } else if index > l.lastLogIndex { - return nil, fmt.Errorf("entry not found: follower=%d, leader=%d", index, l.lastLogIndex) + } + + // If the index is past the leader's log then reset and begin from the end. + // The follower will check the index and trim its log as needed. If the + // follower cannot trim its log then it needs to retrieve a snapshot. + if index > l.lastLogIndex { + index = l.lastLogIndex } // Write configuration. @@ -1686,8 +1772,10 @@ func (l *Log) Flush() { // ReadFrom continually reads log entries from a reader. func (l *Log) ReadFrom(r io.ReadCloser) error { l.tracef("ReadFrom") - if err := l.initReadFrom(r); err != nil { + if err := l.initReadFrom(r); err == errTransitioning { return err + } else if err != nil { + return fmt.Errorf("init read from: %s", err) } // If a nil reader is passed in then exit. @@ -1785,11 +1873,13 @@ func (l *Log) initReadFrom(r io.ReadCloser) error { return ErrClosed } - // Remove previous reader, if one exists. - l.closeReader() + // Close previous reader & set new one. + if err := l.setReader(r); err == errTransitioning { + return err + } else if err != nil { + return fmt.Errorf("set reader: %s", err) + } - // Set new reader. - l.reader = r return nil } @@ -1869,8 +1959,6 @@ func (p uint64Slice) Len() int { return len(p) } func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] } func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -var errDone = errors.New("done") - func assert(condition bool, msg string, v ...interface{}) { if !condition { panic(fmt.Sprintf("assert failed: "+msg, v...)) diff --git a/raft/log_test.go b/raft/log_test.go index aa4818d6652..786cfaea8f6 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -599,7 +599,8 @@ func (c *Cluster) Apply(data []byte) (uint64, error) { } return index, err } - time.Sleep(1 * time.Millisecond) + warn("no leader found in cluster, retrying in 100ms...") + time.Sleep(100 * time.Millisecond) } } diff --git a/raft/transport_test.go b/raft/transport_test.go index 3123fd7dfe8..d88116a2a47 100644 --- a/raft/transport_test.go +++ b/raft/transport_test.go @@ -477,7 +477,7 @@ func (b *streamingBuffer) Read(p []byte) (n int, err error) { } // If not closed then wait a bit and try again. - time.Sleep(1 * time.Millisecond) + time.Sleep(10 * time.Millisecond) continue } @@ -489,6 +489,11 @@ func (b *streamingBuffer) Read(p []byte) (n int, err error) { func (b *streamingBuffer) Write(p []byte) (n int, err error) { b.mu.Lock() defer b.mu.Unlock() + + if b.closed { + return 0, fmt.Errorf("streaming buffer closed") + } + return b.buf.Write(p) } From 189aaa0d8249173a8b56d091c8b99ded9cae6f6a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 29 Mar 2015 15:31:01 -0600 Subject: [PATCH 4/4] Remove debug statements. --- raft/log.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/raft/log.go b/raft/log.go index 60b3c280bee..b95d63063e7 100644 --- a/raft/log.go +++ b/raft/log.go @@ -40,7 +40,7 @@ const logEntryHeaderSize = 8 + 8 + 8 // sz+index+term // WaitInterval represents the amount of time between checks to the applied index. // This is used by clients wanting to wait until a given index is processed. -const WaitInterval = 100 * time.Millisecond +const WaitInterval = 1 * time.Millisecond // State represents whether the log is a follower, candidate, or leader. type State int @@ -1227,7 +1227,6 @@ func (l *Log) Wait(idx uint64) error { for { l.mu.Lock() state, index := l.state, l.FSM.Index() - l.tracef("WAIT> I=%d, Aⁱ=%d, Cⁱ=%d, LLⁱ=%d", idx, index, l.commitIndex, l.lastLogIndex) l.mu.Unlock() if state == Stopped { @@ -1701,11 +1700,6 @@ func (l *Log) writeTo(writer *logWriter, id, term, index uint64) error { } flushWriter(w) - // // Write snapshot index at the end and flush. - // if err := binary.Write(w, binary.BigEndian, snapshotIndex); err != nil { - // return fmt.Errorf("write snapshot index: %s", err) - // } - // Write entries since the snapshot occurred and begin tailing writer. if err := l.advanceWriter(writer); err != nil { return err