-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlitestream.go
160 lines (134 loc) · 4.36 KB
/
litestream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package main
import (
"errors"
"net"
"net/http"
"os"
"time"
"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/s3"
_ "github.com/mattn/go-sqlite3"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type Config struct {
// Bind address for serving metrics.
Addr string `yaml:"addr"`
// List of databases to manage.
DBs []*DBConfig `yaml:"dbs"`
// Subcommand to execute during replication.
// Litestream will shutdown when subcommand exits.
Exec string `yaml:"exec"`
// Global S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
}
type ReplicaConfig struct {
Type string `yaml:"type"` // "file", "s3"
Name string `yaml:"name"` // name of replica, optional.
Path string `yaml:"path"`
URL string `yaml:"url"`
Retention *time.Duration `yaml:"retention"`
RetentionCheckInterval *time.Duration `yaml:"retention-check-interval"`
SyncInterval *time.Duration `yaml:"sync-interval"`
SnapshotInterval *time.Duration `yaml:"snapshot-interval"`
ValidationInterval *time.Duration `yaml:"validation-interval"`
// S3 settings
AccessKeyID string `yaml:"access-key-id"`
SecretAccessKey string `yaml:"secret-access-key"`
Region string `yaml:"region"`
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
ForcePathStyle *bool `yaml:"force-path-style"`
SkipVerify bool `yaml:"skip-verify"`
// ABS settings
AccountName string `yaml:"account-name"`
AccountKey string `yaml:"account-key"`
// SFTP settings
Host string `yaml:"host"`
User string `yaml:"user"`
Password string `yaml:"password"`
KeyPath string `yaml:"key-path"`
}
func (c *ReplicaConfig) ReplicaType() string {
scheme, _, _, _ := ParseReplicaURL(c.URL)
if scheme != "" {
return scheme
} else if c.Type != "" {
return c.Type
}
return "file"
}
type DBConfig struct {
Path string `yaml:"path"`
MonitorInterval *time.Duration `yaml:"monitor-interval"`
MonitorDelayInterval *time.Duration `yaml:"monitor-delay-interval"`
CheckpointInterval *time.Duration `yaml:"checkpoint-interval"`
MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"`
MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"`
ShadowRetentionN *int `yaml:"shadow-retention-count"`
Replicas []*ReplicaConfig `yaml:"replicas"`
}
var execCh chan error
func StartLitestream(config LitestreamConfig) error {
log.Print("Starting main...")
_, err := os.Stat(config.DBLocation)
if err != nil {
return err
}
repConf := ReplicaConfig{
URL: config.S3URL,
AccessKeyID: config.AccessKeyID,
SecretAccessKey: config.SecretAccessKey,
}
dbConf := DBConfig{
Path: config.DBLocation,
Replicas: []*ReplicaConfig{&repConf},
}
conf := Config{
Addr: config.URL,
DBs: []*DBConfig{&dbConf},
}
if len(conf.DBs) == 0 {
return errors.New("no databases specified in configuration")
}
DBs := []*litestream.DB{}
for _, dbConfig := range conf.DBs {
db, err := NewDBFromConfig(dbConfig)
if err != nil {
return err
}
// Open database & attach to program.
if err := db.Open(); err != nil {
return err
}
DBs = append(DBs, db)
}
for _, db := range DBs {
log.Printf("initialized db: %s", db.Path())
for _, r := range db.Replicas {
switch client := r.Client.(type) {
case *s3.ReplicaClient:
log.Printf("replicating to: name=%q type=%q bucket=%q path=%q region=%q endpoint=%q sync-interval=%s", r.Name(), client.Type(), client.Bucket, client.Path, client.Region, client.Endpoint, r.SyncInterval)
default:
log.Printf("replicating to: name=%q type=%q", r.Name(), client.Type())
}
}
}
if conf.Addr != "" {
hostport := conf.Addr
if host, port, _ := net.SplitHostPort(conf.Addr); port == "" {
log.Fatalf("must specify port for bind address: %q", conf.Addr)
} else if host == "" {
hostport = net.JoinHostPort("localhost", port)
}
log.Printf("serving metrics on http://%s/metrics", hostport)
go func() {
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(conf.Addr, nil); err != nil {
log.Printf("cannot start metrics server: %s", err)
}
}()
}
log.Printf("litestream initialization complete")
return nil
}