diff --git a/messaging/broker.go b/messaging/broker.go index 1ac3ce0754b..2d232a66b4b 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -57,6 +57,14 @@ func (b *Broker) metaPath() string { return filepath.Join(b.path, "meta") } +// Index returns the highest index seen by the broker across all topics. +// Returns 0 if the broker is closed. +func (b *Broker) Index() uint64 { + b.mu.Lock() + b.mu.Unlock() + return b.index +} + func (b *Broker) opened() bool { return b.path != "" } // SetLogOutput sets writer for all Broker log output. @@ -181,9 +189,23 @@ func (b *Broker) load() error { } } - // Set the broker's index to the last index seen across all topics. - b.index = hdr.maxIndex() + // Read the highest index from each of the topic files. + if err := b.loadIndex(); err != nil { + return fmt.Errorf("load index: %s", err) + } + + return nil +} +// loadIndex reads through all topics to find the highest known index. +func (b *Broker) loadIndex() error { + for _, t := range b.topics { + if err := t.loadIndex(); err != nil { + return fmt.Errorf("topic(%d): %s", t.id, err) + } else if t.index > b.index { + b.index = t.index + } + } return nil } @@ -568,12 +590,7 @@ func (fsm *brokerFSM) MustApply(e *raft.LogEntry) { } // Save highest applied index. - // TODO: Persist to disk for raft commands. b.index = e.Index - - // HACK: Persist metadata after each apply. - // This should be derived on startup from the topic logs. - b.mustSave() } // Index returns the highest index that the broker has seen. @@ -774,6 +791,33 @@ func (t *topic) Close() error { return nil } +// loadIndex reads the highest available index for a topic from disk. +func (t *topic) loadIndex() error { + // Open topic file for reading. + f, err := os.Open(t.path) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + defer func() { _ = f.Close() }() + + // Read all messages. + dec := NewMessageDecoder(bufio.NewReader(f)) + for { + // Decode message. + var m Message + if err := dec.Decode(&m); err == io.EOF { + return nil + } else if err != nil { + return fmt.Errorf("decode: %s", err) + } + + // Update the topic's highest index. + t.index = m.Index + } +} + // writeTo writes the topic to a replica since a given index. // Returns an error if the starting index is unavailable. func (t *topic) writeTo(r *Replica, index uint64) (int64, error) { diff --git a/messaging/broker_test.go b/messaging/broker_test.go index fda760514e6..85f1a86fce4 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -194,6 +194,34 @@ func TestBroker_Unsubscribe_ErrReplicaNotFound(t *testing.T) { } } +// Ensure the broker can reopen and recover correctly. +func TestBroker_Reopen(t *testing.T) { + b := NewBroker(nil) + defer b.Close() + b.MustCreateReplica(2000, &url.URL{Host: "localhost"}) + b.MustSubscribe(2000, 20) + b.MustSubscribe(2000, 21) + b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) + b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) + b.MustPublishSync(&messaging.Message{TopicID: 21, Data: []byte("0000")}) + index := b.MustPublishSync(&messaging.Message{TopicID: 20, Data: []byte("0000")}) + time.Sleep(100 * time.Millisecond) + + // Close broker and reopen with a new broker instance. + path, u := b.Path(), b.URL() + b.Broker.Close() + b.Broker = messaging.NewBroker() + if err := b.Broker.Open(path, u); err != nil { + t.Fatal(err) + } + + // Verify the broker is up to date. + newIndex := b.Index() + if newIndex != index { + t.Fatalf("index mismatch: exp=%d, got=%d", index, newIndex) + } +} + // Benchmarks a single broker without HTTP. func BenchmarkBroker_Publish(b *testing.B) { br := NewBroker(nil) @@ -276,6 +304,43 @@ func (b *Broker) MustReadAll(replicaID uint64) (a []*messaging.Message) { return } +// MustCreateReplica creates a new replica. Panic on error. +func (b *Broker) MustCreateReplica(replicaID uint64, u *url.URL) { + if err := b.CreateReplica(replicaID, u); err != nil { + panic(err.Error()) + } +} + +// MustSubscribe subscribes a replica to a topic. Panic on error. +func (b *Broker) MustSubscribe(replicaID, topicID uint64) { + if err := b.Subscribe(replicaID, topicID); err != nil { + panic(err.Error()) + } +} + +// MustSync syncs to a broker index. Panic on error. +func (b *Broker) MustSync(index uint64) { + if err := b.Sync(index); err != nil { + panic(err.Error()) + } +} + +// MustPublish publishes a message to the broker. Panic on error. +func (b *Broker) MustPublish(m *messaging.Message) uint64 { + index, err := b.Publish(&messaging.Message{Type: 100, TopicID: 20, Data: []byte("0000")}) + if err != nil { + panic(err.Error()) + } + return index +} + +// MustPublishSync publishes a message to the broker and syncs to that index. Panic on error. +func (b *Broker) MustPublishSync(m *messaging.Message) uint64 { + index := b.MustPublish(m) + b.MustSync(index) + return index +} + // Messages represents a collection of messages. // This type provides helper functions. type Messages []*messaging.Message diff --git a/raft/log.go b/raft/log.go index 9c674b6e200..c2280597a37 100644 --- a/raft/log.go +++ b/raft/log.go @@ -45,7 +45,7 @@ const logEntryHeaderSize = 8 + 8 + 8 // sz+index+term // WaitInterval represents the amount of time between checks to the applied index. // This is used by clients wanting to wait until a given index is processed. -const WaitInterval = 100 * time.Millisecond +const WaitInterval = 1 * time.Millisecond // State represents whether the log is a follower, candidate, or leader. type State int