-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathraft.go
112 lines (94 loc) · 3.11 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/*
Package raft implements the Raft consensus algorithm.
*/
package raft
import (
"math/rand"
"os"
"time"
pb "github.com/bbengfort/raft/api/v1beta1"
"github.com/bbengfort/x/noplog"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"google.golang.org/grpc/grpclog"
)
//===========================================================================
// Package Initialization
//===========================================================================
// Initialize the package and random numbers, etc.
func init() {
// Set the random seed to something different each time.
rand.Seed(time.Now().UnixNano())
// Stop the grpc verbose logging
//lint:ignore SA1019 noplog doesn't implement the V2 interface
grpclog.SetLogger(noplog.New())
// Initialize zerolog for server-side process logging
zerolog.TimeFieldFormat = time.RFC3339Nano
log.Logger = zerolog.New(os.Stdout).With().Timestamp().Logger()
}
// StateMachine implements a handler for applying commands when they are
// committed or for dropping commands if they are truncated from the log.
type StateMachine interface {
CommitEntry(entry *pb.LogEntry) error
DropEntry(entry *pb.LogEntry) error
}
//===========================================================================
// New Raft Instance
//===========================================================================
// New Raft replica with the specified config.
func New(options *Config) (replica *Replica, err error) {
// Create a new configuration from defaults, configuration file, and
// the environment; then verify it, returning any errors.
config := new(Config)
if err = config.Load(); err != nil {
return nil, err
}
// Update the configuration with the passed in configuration.
if err = config.Update(options); err != nil {
return nil, err
}
// Configure logging (will modify logging globally for all packages!)
zerolog.SetGlobalLevel(config.GetLogLevel())
if config.ConsoleLog {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
}
// Set the random seed
if config.Seed != 0 {
log.Debug().Int64("seed", config.Seed).Msg("setting random seed")
rand.Seed(config.Seed)
}
// Create and initialize the replica
replica = new(Replica)
replica.config = config
replica.remotes = make(map[string]*Remote)
replica.clients = make(map[uint64]chan *pb.CommitReply)
replica.log = NewLog(replica)
replica.Metrics = NewMetrics()
// Create the local replica definition
replica.Peer, err = config.GetPeer()
if err != nil {
return nil, err
}
// Create the remotes from peers
for _, peer := range config.Peers {
// Do not store local host in remotes
if replica.Name == peer.Name {
continue
}
// Create the remote connection and client
replica.remotes[peer.Name], err = NewRemote(peer, replica)
if err != nil {
return nil, err
}
}
// Create the ticker from the configuration
tick, err := config.GetTick()
if err != nil {
return nil, err
}
replica.ticker = NewTicker(replica, tick)
// Set state to initialized
replica.setState(Initialized)
log.Info().Str("name", replica.Name).Int("nPeers", len(replica.remotes)).Msg("raft replica created")
return replica, nil
}