Skip to content

Commit 41b9e23

Browse files
committed
Cleans up some structure and comments based on review feedback.
1 parent 8ba6aca commit 41b9e23

File tree

3 files changed

+80
-74
lines changed

3 files changed

+80
-74
lines changed

fsm.go

+1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func (r *Raft) runFSM() {
109109
commitEntry.future.response = resp
110110
commitEntry.future.respond(nil)
111111
}
112+
112113
case <-r.shutdownCh:
113114
return
114115
}

future.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ type SnapshotFuture interface {
5353
Future
5454

5555
// Open is a function you can call to access the underlying snapshot and
56-
// its metadata.
56+
// its metadata. This must not be called until after the Error method
57+
// has returned.
5758
Open() (*SnapshotMeta, io.ReadCloser, error)
5859
}
5960

@@ -177,6 +178,11 @@ func (u *userSnapshotFuture) Open() (*SnapshotMeta, io.ReadCloser, error) {
177178
if u.opener == nil {
178179
return nil, nil, fmt.Errorf("no snapshot available")
179180
} else {
181+
// Invalidate the opener so it can't get called multiple times,
182+
// which isn't generally safe.
183+
defer func() {
184+
u.opener = nil
185+
}()
180186
return u.opener()
181187
}
182188
}

raft.go

+72-73
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,8 @@ func (r *Raft) leaderLoop() {
559559
}
560560

561561
case future := <-r.userRestoreCh:
562-
r.restoreUserSnapshot(future)
562+
err := r.restoreUserSnapshot(future.meta, future.reader)
563+
future.respond(err)
563564

564565
case c := <-r.configurationsCh:
565566
c.configurations = r.configurations.Clone()
@@ -699,84 +700,82 @@ func (r *Raft) quorumSize() int {
699700
// so that the snapshot will be sent to followers and used for any new joiners.
700701
// This can only be run on the leader, and returns a future that can be used to
701702
// block until complete.
702-
func (r *Raft) restoreUserSnapshot(future *userRestoreFuture) {
703+
func (r *Raft) restoreUserSnapshot(meta *SnapshotMeta, reader io.ReadCloser) error {
703704
defer metrics.MeasureSince([]string{"raft", "restoreUserSnapshot"}, time.Now())
704-
err := func() error {
705-
// Sanity check the version.
706-
version := future.meta.Version
707-
if version < SnapshotVersionMin || version > SnapshotVersionMax {
708-
return fmt.Errorf("unsupported snapshot version %d", version)
709-
}
710705

711-
// We don't support snapshots while there's a config change
712-
// outstanding since the snapshot doesn't have a means to
713-
// represent this state.
714-
committedIndex := r.configurations.committedIndex
715-
latestIndex := r.configurations.latestIndex
716-
if committedIndex != latestIndex {
717-
return fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
718-
latestIndex, committedIndex)
719-
}
706+
// Sanity check the version.
707+
version := meta.Version
708+
if version < SnapshotVersionMin || version > SnapshotVersionMax {
709+
return fmt.Errorf("unsupported snapshot version %d", version)
710+
}
720711

721-
// We will overwrite the snapshot metadata with the current term,
722-
// an index that's greater than the current index, or the last
723-
// index in the snapshot. It's important that we leave a hole in
724-
// the index so we know there's nothing in the Raft log there and
725-
// replication will fault and send the snapshot.
726-
term := r.getCurrentTerm()
727-
lastIndex := r.getLastIndex()
728-
if future.meta.Index > lastIndex {
729-
lastIndex = future.meta.Index
730-
}
731-
lastIndex++
712+
// We don't support snapshots while there's a config change
713+
// outstanding since the snapshot doesn't have a means to
714+
// represent this state.
715+
committedIndex := r.configurations.committedIndex
716+
latestIndex := r.configurations.latestIndex
717+
if committedIndex != latestIndex {
718+
return fmt.Errorf("cannot restore snapshot now, wait until the configuration entry at %v has been applied (have applied %v)",
719+
latestIndex, committedIndex)
720+
}
732721

733-
// Dump the snapshot. Note that we use the latest configuration,
734-
// not the one that came with the snapshot.
735-
sink, err := r.snapshots.Create(version, lastIndex, term,
736-
r.configurations.latest, r.configurations.latestIndex, r.trans)
737-
if err != nil {
738-
return fmt.Errorf("failed to create snapshot: %v", err)
739-
}
740-
n, err := io.Copy(sink, future.reader)
741-
if err != nil {
742-
sink.Cancel()
743-
return fmt.Errorf("failed to write snapshot: %v", err)
744-
}
745-
if n != future.meta.Size {
746-
sink.Cancel()
747-
return fmt.Errorf("failed to write snapshot, size didn't match (%d != %d)", n, future.meta.Size)
748-
}
749-
if err := sink.Close(); err != nil {
750-
return fmt.Errorf("failed to close snapshot: %v", err)
751-
}
752-
r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n)
722+
// We will overwrite the snapshot metadata with the current term,
723+
// an index that's greater than the current index, or the last
724+
// index in the snapshot. It's important that we leave a hole in
725+
// the index so we know there's nothing in the Raft log there and
726+
// replication will fault and send the snapshot.
727+
term := r.getCurrentTerm()
728+
lastIndex := r.getLastIndex()
729+
if meta.Index > lastIndex {
730+
lastIndex = meta.Index
731+
}
732+
lastIndex++
753733

754-
// Restore the snapshot into the FSM. If this fails we are in a
755-
// bad state so we panic to take ourselves out.
756-
fsm := &restoreFuture{ID: sink.ID()}
757-
fsm.init()
758-
select {
759-
case r.fsmRestoreCh <- fsm:
760-
case <-r.shutdownCh:
761-
return ErrRaftShutdown
762-
}
763-
if err := fsm.Error(); err != nil {
764-
panic(fmt.Errorf("failed to restore snapshot: %v", err))
765-
}
734+
// Dump the snapshot. Note that we use the latest configuration,
735+
// not the one that came with the snapshot.
736+
sink, err := r.snapshots.Create(version, lastIndex, term,
737+
r.configurations.latest, r.configurations.latestIndex, r.trans)
738+
if err != nil {
739+
return fmt.Errorf("failed to create snapshot: %v", err)
740+
}
741+
n, err := io.Copy(sink, reader)
742+
if err != nil {
743+
sink.Cancel()
744+
return fmt.Errorf("failed to write snapshot: %v", err)
745+
}
746+
if n != meta.Size {
747+
sink.Cancel()
748+
return fmt.Errorf("failed to write snapshot, size didn't match (%d != %d)", n, meta.Size)
749+
}
750+
if err := sink.Close(); err != nil {
751+
return fmt.Errorf("failed to close snapshot: %v", err)
752+
}
753+
r.logger.Printf("[INFO] raft: Copied %d bytes to local snapshot", n)
766754

767-
// We set the last log so it looks like we've stored the empty
768-
// index we burned. The last applied is set because we made the
769-
// FSM take the snapshot state, and we store the last snapshot
770-
// in the stable store since we created a snapshot as part of
771-
// this process.
772-
r.setLastLog(lastIndex, term)
773-
r.setLastApplied(lastIndex)
774-
r.setLastSnapshot(lastIndex, term)
775-
776-
r.logger.Printf("[INFO] raft: Restored user snapshot (index %d)", lastIndex)
777-
return nil
778-
}()
779-
future.respond(err)
755+
// Restore the snapshot into the FSM. If this fails we are in a
756+
// bad state so we panic to take ourselves out.
757+
fsm := &restoreFuture{ID: sink.ID()}
758+
fsm.init()
759+
select {
760+
case r.fsmRestoreCh <- fsm:
761+
case <-r.shutdownCh:
762+
return ErrRaftShutdown
763+
}
764+
if err := fsm.Error(); err != nil {
765+
panic(fmt.Errorf("failed to restore snapshot: %v", err))
766+
}
767+
768+
// We set the last log so it looks like we've stored the empty
769+
// index we burned. The last applied is set because we made the
770+
// FSM take the snapshot state, and we store the last snapshot
771+
// in the stable store since we created a snapshot as part of
772+
// this process.
773+
r.setLastLog(lastIndex, term)
774+
r.setLastApplied(lastIndex)
775+
r.setLastSnapshot(lastIndex, term)
776+
777+
r.logger.Printf("[INFO] raft: Restored user snapshot (index %d)", lastIndex)
778+
return nil
780779
}
781780

782781
// appendConfigurationEntry changes the configuration and adds a new

0 commit comments

Comments
 (0)