Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft stability #2111

Merged
merged 5 commits into from
Mar 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ func (b *Broker) Topic(id uint64) *Topic {

// Index returns the highest index seen by the broker across all topics.
// Returns 0 if the broker is closed.
func (b *Broker) Index() (uint64, error) {
func (b *Broker) Index() uint64 {
b.mu.RLock()
defer b.mu.RUnlock()
return b.index, nil
return b.index
}

// opened returns true if the broker is in an open and running state.
Expand Down Expand Up @@ -256,8 +256,8 @@ func (b *Broker) setMaxIndex(index uint64) error {
return nil
}

// Snapshot streams the current state of the broker and returns the index.
func (b *Broker) Snapshot(w io.Writer) (uint64, error) {
// WriteTo writes a snapshot of the broker to w.
func (b *Broker) WriteTo(w io.Writer) (int64, error) {
// TODO: Prevent truncation during snapshot.

// Calculate header under lock.
Expand Down Expand Up @@ -291,8 +291,7 @@ func (b *Broker) Snapshot(w io.Writer) (uint64, error) {
}
}

// Return the snapshot and its last applied index.
return hdr.Index, nil
return 0, nil
}

// createSnapshotHeader creates a snapshot header.
Expand Down Expand Up @@ -352,32 +351,32 @@ func copyFileN(w io.Writer, path string, n int64) (int64, error) {
return io.CopyN(w, f, n)
}

// Restore reads the broker state.
func (b *Broker) Restore(r io.Reader) error {
// ReadFrom reads a broker snapshot from r.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: describe what the returned int64 is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the number of bytes read but we're not supporting it. It's just an implementation of the io.ReaderFrom interface. We return an error for any partial reads.

func (b *Broker) ReadFrom(r io.Reader) (int64, error) {
b.mu.Lock()
defer b.mu.Unlock()

// Remove and recreate broker path.
if err := b.reset(); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("reset: %s", err)
return 0, fmt.Errorf("reset: %s", err)
} else if err = os.MkdirAll(b.path, 0777); err != nil {
return fmt.Errorf("mkdir: %s", err)
return 0, fmt.Errorf("mkdir: %s", err)
}

// Read header frame.
var sz uint32
if err := binary.Read(r, binary.BigEndian, &sz); err != nil {
return fmt.Errorf("read header size: %s", err)
return 0, fmt.Errorf("read header size: %s", err)
}
buf := make([]byte, sz)
if _, err := io.ReadFull(r, buf); err != nil {
return fmt.Errorf("read header: %s", err)
return 0, fmt.Errorf("read header: %s", err)
}

// Decode header.
sh := &snapshotHeader{}
if err := json.Unmarshal(buf, &sh); err != nil {
return fmt.Errorf("decode header: %s", err)
return 0, fmt.Errorf("decode header: %s", err)
}

// Close any topics which might be open and clear them out.
Expand All @@ -389,7 +388,7 @@ func (b *Broker) Restore(r io.Reader) error {

// Create topic directory.
if err := os.MkdirAll(t.Path(), 0777); err != nil {
return fmt.Errorf("make topic dir: %s", err)
return 0, fmt.Errorf("make topic dir: %s", err)
}

// Copy data from snapshot into segment files.
Expand All @@ -411,24 +410,24 @@ func (b *Broker) Restore(r io.Reader) error {

return nil
}(); err != nil {
return err
return 0, err
}
}

// Open topic.
if err := t.Open(); err != nil {
return fmt.Errorf("open topic: %s", err)
return 0, fmt.Errorf("open topic: %s", err)
}
b.topics[t.id] = t
}

// Set the highest seen index.
if err := b.setMaxIndex(sh.Index); err != nil {
return fmt.Errorf("set max index: %s", err)
return 0, fmt.Errorf("set max index: %s", err)
}
b.index = sh.Index

return nil
return 0, nil
}

// reset removes all files in the broker directory besides the raft directory.
Expand Down Expand Up @@ -570,20 +569,21 @@ type snapshotTopicSegment struct {
// It will panic for any errors that occur during Apply.
type RaftFSM struct {
Broker interface {
io.WriterTo
io.ReaderFrom

Apply(m *Message) error
Index() (uint64, error)
Index() uint64
SetMaxIndex(uint64) error
Snapshot(w io.Writer) (uint64, error)
Restore(r io.Reader) error
}
}

func (fsm *RaftFSM) Index() (uint64, error) { return fsm.Broker.Index() }
func (fsm *RaftFSM) Snapshot(w io.Writer) (uint64, error) { return fsm.Broker.Snapshot(w) }
func (fsm *RaftFSM) Restore(r io.Reader) error { return fsm.Broker.Restore(r) }
func (fsm *RaftFSM) Index() uint64 { return fsm.Broker.Index() }
func (fsm *RaftFSM) WriteTo(w io.Writer) (n int64, err error) { return fsm.Broker.WriteTo(w) }
func (fsm *RaftFSM) ReadFrom(r io.Reader) (n int64, err error) { return fsm.Broker.ReadFrom(r) }

// MustApply applies a raft command to the broker. Panic on error.
func (fsm *RaftFSM) MustApply(e *raft.LogEntry) {
// Apply applies a raft command to the broker.
func (fsm *RaftFSM) Apply(e *raft.LogEntry) error {
switch e.Type {
case raft.LogEntryCommand:
// Decode message.
Expand All @@ -595,15 +595,17 @@ func (fsm *RaftFSM) MustApply(e *raft.LogEntry) {

// Apply message.
if err := fsm.Broker.Apply(m); err != nil {
panic(err.Error())
return fmt.Errorf("broker apply: %s", err)
}

default:
// Move internal index forward if it's an internal raft comand.
if err := fsm.Broker.SetMaxIndex(e.Index); err != nil {
panic(fmt.Sprintf("set max index: idx=%d, err=%s", e.Index, err))
return fmt.Errorf("set max index: idx=%d, err=%s", e.Index, err)
}
}

return nil
}

// DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.
Expand Down
28 changes: 14 additions & 14 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestBroker_Apply(t *testing.T) {
}

// Verify broker high water mark.
if index, _ := b.Index(); index != 4 {
if index := b.Index(); index != 4 {
t.Fatalf("unexpected broker index: %d", index)
}
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestBroker_Reopen(t *testing.T) {
}

// Verify broker high water mark.
if index, _ := b.Index(); index != 4 {
if index := b.Index(); index != 4 {
t.Fatalf("unexpected broker index: %d", index)
}

Expand Down Expand Up @@ -188,16 +188,14 @@ func TestBroker_Snapshot(t *testing.T) {

// Snapshot the first broker.
var buf bytes.Buffer
if index, err := b0.Snapshot(&buf); err != nil {
if _, err := b0.WriteTo(&buf); err != nil {
t.Fatalf("snapshot error: %s", err)
} else if index != 4 {
t.Fatalf("unexpected snapshot index: %d", index)
}

// Restore to the second broker.
b1 := OpenBroker()
defer b1.Close()
if err := b1.Restore(&buf); err != nil {
if _, err := b1.ReadFrom(&buf); err != nil {
t.Fatalf("restore error: %s", err)
}

Expand All @@ -224,7 +222,7 @@ func TestBroker_Snapshot(t *testing.T) {
}

// Verify broker high water mark.
if index, _ := b1.Index(); index != 4 {
if index := b1.Index(); index != 4 {
t.Fatalf("unexpected broker index: %d", index)
}
}
Expand Down Expand Up @@ -268,8 +266,9 @@ func TestRaftFSM_MustApply_Message(t *testing.T) {
// Encode message and apply it as a log entry.
m := messaging.Message{TopicID: 20}
data, _ := m.MarshalBinary()
fsm.MustApply(&raft.LogEntry{Index: 2, Data: data})
if !called {
if err := fsm.Apply(&raft.LogEntry{Index: 2, Data: data}); err != nil {
t.Fatal(err)
} else if !called {
t.Fatal("Apply() not called")
}
}
Expand All @@ -289,8 +288,9 @@ func TestRaftFSM_MustApply_Internal(t *testing.T) {
}

// Encode message and apply it as a log entry.
fsm.MustApply(&raft.LogEntry{Type: raft.LogEntryAddPeer, Index: 2})
if !called {
if err := fsm.Apply(&raft.LogEntry{Type: raft.LogEntryAddPeer, Index: 2}); err != nil {
t.Fatal(err)
} else if !called {
t.Fatal("Apply() not called")
}
}
Expand Down Expand Up @@ -318,9 +318,9 @@ type RaftFSMBroker struct {
func (b *RaftFSMBroker) Apply(m *messaging.Message) error { return b.ApplyFunc(m) }
func (b *RaftFSMBroker) SetMaxIndex(index uint64) error { return b.SetMaxIndexFunc(index) }

func (b *RaftFSMBroker) Index() (uint64, error) { return 0, nil }
func (b *RaftFSMBroker) Snapshot(w io.Writer) (uint64, error) { return 0, nil }
func (b *RaftFSMBroker) Restore(r io.Reader) error { return nil }
func (b *RaftFSMBroker) Index() uint64 { return 0 }
func (b *RaftFSMBroker) WriteTo(w io.Writer) (n int64, err error) { return 0, nil }
func (b *RaftFSMBroker) ReadFrom(r io.Reader) (n int64, err error) { return 0, nil }

// Ensure a list of topics can be read from a directory.
func TestReadTopics(t *testing.T) {
Expand Down
14 changes: 10 additions & 4 deletions raft/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ var (
// ErrOutOfDateLog is returned when a candidate's log is not up to date.
ErrOutOfDateLog = errors.New("out of date log")

// ErrUncommittedIndex is returned when a stream is started from an
// uncommitted log index.
ErrUncommittedIndex = errors.New("uncommitted index")

// ErrAlreadyVoted is returned when a vote has already been cast for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't happen anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the stream starts from the highest available index on the leader and the follower will trim its log or re-request a snapshot if needed.

// a different candidate in the same election term.
ErrAlreadyVoted = errors.New("already voted")
Expand All @@ -50,4 +46,14 @@ var (

// ErrDuplicateNodeURL is returned when adding a node with an existing URL.
ErrDuplicateNodeURL = errors.New("duplicate node url")

// ErrSnapshotRequired returned when reading from an out-of-order log.
// The snapshot will be retrieved on the next reader request.
ErrSnapshotRequired = errors.New("snapshot required")
)

// Internal marker errors.
var (
errClosing = errors.New("closing marker")
errTransitioning = errors.New("transitioning marker")
)
19 changes: 11 additions & 8 deletions raft/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,22 @@ type IndexFSM struct {
}

// MustApply updates the index.
func (fsm *IndexFSM) MustApply(entry *LogEntry) { fsm.index = entry.Index }
func (fsm *IndexFSM) Apply(entry *LogEntry) error {
fsm.index = entry.Index
return nil
}

// Index returns the highest applied index.
func (fsm *IndexFSM) Index() (uint64, error) { return fsm.index, nil }
func (fsm *IndexFSM) Index() uint64 { return fsm.index }

// Snapshot writes the FSM's index as the snapshot.
func (fsm *IndexFSM) Snapshot(w io.Writer) (uint64, error) {
return fsm.index, binary.Write(w, binary.BigEndian, fsm.index)
// WriteTo writes a snapshot of the FSM to w.
func (fsm *IndexFSM) WriteTo(w io.Writer) (n int64, err error) {
return 0, binary.Write(w, binary.BigEndian, fsm.index)
}

// Restore reads the snapshot from the reader.
func (fsm *IndexFSM) Restore(r io.Reader) error {
return binary.Read(r, binary.BigEndian, &fsm.index)
// ReadFrom reads an FSM snapshot from r.
func (fsm *IndexFSM) ReadFrom(r io.Reader) (n int64, err error) {
return 0, binary.Read(r, binary.BigEndian, &fsm.index)
}

// tempfile returns the path to a non-existent file in the temp directory.
Expand Down
Loading