Skip to content

Commit def7451

Browse files
authored
Merge pull request hashicorp#189 from hashicorp/inmem-snapshots
Added an in-mem snapshot store
2 parents e1d3deb + fcbbf35 commit def7451

File tree

2 files changed

+213
-0
lines changed

2 files changed

+213
-0
lines changed

inmem_snapshot.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package raft
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"io/ioutil"
8+
)
9+
10+
// InmemSnapshotStore implements the SnapshotStore interface and
11+
// retains only the most recent snapshot
12+
type InmemSnapshotStore struct {
13+
latest *InmemSnapshotSink
14+
hasSnapshot bool
15+
}
16+
17+
// InmemSnapshotSink implements SnapshotSink in memory
18+
type InmemSnapshotSink struct {
19+
meta SnapshotMeta
20+
contents *bytes.Buffer
21+
}
22+
23+
// NewInmemSnapshotStore creates a blank new InmemSnapshotStore
24+
func NewInmemSnapshotStore() *InmemSnapshotStore {
25+
return &InmemSnapshotStore{
26+
latest: &InmemSnapshotSink{
27+
contents: &bytes.Buffer{},
28+
},
29+
}
30+
}
31+
32+
// Create replaces the stored snapshot with a new one using the given args
33+
func (m *InmemSnapshotStore) Create(version SnapshotVersion, index, term uint64,
34+
configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error) {
35+
// We only support version 1 snapshots at this time.
36+
if version != 1 {
37+
return nil, fmt.Errorf("unsupported snapshot version %d", version)
38+
}
39+
40+
name := snapshotName(term, index)
41+
42+
sink := m.latest
43+
sink.meta = SnapshotMeta{
44+
Version: version,
45+
ID: name,
46+
Index: index,
47+
Term: term,
48+
Peers: encodePeers(configuration, trans),
49+
Configuration: configuration,
50+
ConfigurationIndex: configurationIndex,
51+
}
52+
sink.contents = &bytes.Buffer{}
53+
m.hasSnapshot = true
54+
55+
return sink, nil
56+
}
57+
58+
// List returns the latest snapshot taken
59+
func (m *InmemSnapshotStore) List() ([]*SnapshotMeta, error) {
60+
if !m.hasSnapshot {
61+
return []*SnapshotMeta{}, nil
62+
}
63+
return []*SnapshotMeta{&m.latest.meta}, nil
64+
}
65+
66+
// Open wraps an io.ReadCloser around the snapshot contents
67+
func (m *InmemSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) {
68+
if m.latest.meta.ID != id {
69+
return nil, nil, fmt.Errorf("[ERR] snapshot: failed to open snapshot id: %s", id)
70+
}
71+
72+
return &m.latest.meta, ioutil.NopCloser(m.latest.contents), nil
73+
}
74+
75+
// Write appends the given bytes to the snapshot contents
76+
func (s *InmemSnapshotSink) Write(p []byte) (n int, err error) {
77+
written, err := io.Copy(s.contents, bytes.NewReader(p))
78+
s.meta.Size += written
79+
return int(written), err
80+
}
81+
82+
// Close updates the Size and is otherwise a no-op
83+
func (s *InmemSnapshotSink) Close() error {
84+
return nil
85+
}
86+
87+
func (s *InmemSnapshotSink) ID() string {
88+
return s.meta.ID
89+
}
90+
91+
func (s *InmemSnapshotSink) Cancel() error {
92+
return nil
93+
}

inmem_snapshot_test.go

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package raft
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"reflect"
7+
"testing"
8+
)
9+
10+
func TestInmemSnapshotStoreImpl(t *testing.T) {
11+
var impl interface{} = &InmemSnapshotStore{}
12+
if _, ok := impl.(SnapshotStore); !ok {
13+
t.Fatalf("InmemSnapshotStore not a SnapshotStore")
14+
}
15+
}
16+
17+
func TestInmemSnapshotSinkImpl(t *testing.T) {
18+
var impl interface{} = &InmemSnapshotSink{}
19+
if _, ok := impl.(SnapshotSink); !ok {
20+
t.Fatalf("InmemSnapshotSink not a SnapshotSink")
21+
}
22+
}
23+
24+
func TestInmemSS_CreateSnapshot(t *testing.T) {
25+
snap := NewInmemSnapshotStore()
26+
27+
// Check no snapshots
28+
snaps, err := snap.List()
29+
if err != nil {
30+
t.Fatalf("err: %v", err)
31+
}
32+
if len(snaps) != 0 {
33+
t.Fatalf("did not expect any snapshots: %v", snaps)
34+
}
35+
36+
// Create a new sink
37+
var configuration Configuration
38+
configuration.Servers = append(configuration.Servers, Server{
39+
Suffrage: Voter,
40+
ID: ServerID("my id"),
41+
Address: ServerAddress("over here"),
42+
})
43+
_, trans := NewInmemTransport(NewInmemAddr())
44+
sink, err := snap.Create(SnapshotVersionMax, 10, 3, configuration, 2, trans)
45+
if err != nil {
46+
t.Fatalf("err: %v", err)
47+
}
48+
49+
// The sink is not done, should not be in a list!
50+
snaps, err = snap.List()
51+
if err != nil {
52+
t.Fatalf("err: %v", err)
53+
}
54+
if len(snaps) != 1 {
55+
t.Fatalf("should always be 1 snapshot: %v", snaps)
56+
}
57+
58+
// Write to the sink
59+
_, err = sink.Write([]byte("first\n"))
60+
if err != nil {
61+
t.Fatalf("err: %v", err)
62+
}
63+
_, err = sink.Write([]byte("second\n"))
64+
if err != nil {
65+
t.Fatalf("err: %v", err)
66+
}
67+
68+
// Done!
69+
err = sink.Close()
70+
if err != nil {
71+
t.Fatalf("err: %v", err)
72+
}
73+
74+
// Should have a snapshot!
75+
snaps, err = snap.List()
76+
if err != nil {
77+
t.Fatalf("err: %v", err)
78+
}
79+
if len(snaps) != 1 {
80+
t.Fatalf("expect a snapshots: %v", snaps)
81+
}
82+
83+
// Check the latest
84+
latest := snaps[0]
85+
if latest.Index != 10 {
86+
t.Fatalf("bad snapshot: %v", *latest)
87+
}
88+
if latest.Term != 3 {
89+
t.Fatalf("bad snapshot: %v", *latest)
90+
}
91+
if !reflect.DeepEqual(latest.Configuration, configuration) {
92+
t.Fatalf("bad snapshot: %v", *latest)
93+
}
94+
if latest.ConfigurationIndex != 2 {
95+
t.Fatalf("bad snapshot: %v", *latest)
96+
}
97+
if latest.Size != 13 {
98+
t.Fatalf("bad snapshot: %v", *latest)
99+
}
100+
101+
// Read the snapshot
102+
_, r, err := snap.Open(latest.ID)
103+
if err != nil {
104+
t.Fatalf("err: %v", err)
105+
}
106+
107+
// Read out everything
108+
var buf bytes.Buffer
109+
if _, err := io.Copy(&buf, r); err != nil {
110+
t.Fatalf("err: %v", err)
111+
}
112+
if err := r.Close(); err != nil {
113+
t.Fatalf("err: %v", err)
114+
}
115+
116+
// Ensure a match
117+
if bytes.Compare(buf.Bytes(), []byte("first\nsecond\n")) != 0 {
118+
t.Fatalf("content mismatch")
119+
}
120+
}

0 commit comments

Comments
 (0)