-
Notifications
You must be signed in to change notification settings - Fork 101
/
Copy pathdatavalidator.go
440 lines (378 loc) · 14.7 KB
/
datavalidator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
// Copyright (c) 2018 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package validator
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"time"
"github.com/gardener/etcd-backup-restore/pkg/etcdutil"
"github.com/gardener/etcd-backup-restore/pkg/etcdutil/client"
"github.com/gardener/etcd-backup-restore/pkg/miscellaneous"
"github.com/gardener/etcd-backup-restore/pkg/snapstore"
brtypes "github.com/gardener/etcd-backup-restore/pkg/types"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/snap"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal"
"go.etcd.io/etcd/wal/walpb"
"go.uber.org/zap"
)
const (
defaultMaxRequestBytes = 10 * 1024 * 1024 //10Mib
defaultMaxTxnOps = 10 * 1024
)
var (
// ErrPathRequired is returned when the path to a Bolt database is not specified.
ErrPathRequired = errors.New("path required")
// ErrFileNotFound is returned when a Bolt database does not exist.
ErrFileNotFound = errors.New("file not found")
// ErrCorrupt is returned when a checking a data file finds errors.
ErrCorrupt = errors.New("invalid value")
)
func (d *DataValidator) memberDir() string { return filepath.Join(d.Config.DataDir, "member") }
func (d *DataValidator) walDir() string { return filepath.Join(d.memberDir(), "wal") }
func (d *DataValidator) snapDir() string { return filepath.Join(d.memberDir(), "snap") }
func (d *DataValidator) backendPath() string { return filepath.Join(d.snapDir(), "db") }
// Validate performs the steps required to validate data for Etcd instance.
func (d *DataValidator) Validate(mode Mode, failBelowRevision int64) (DataDirStatus, error) {
status, err := d.sanityCheck(failBelowRevision)
if status != DataDirectoryValid {
// TODO: To be removed when backup-restore supports restoration of single member in multi-node etcd cluster.
if d.OriginalClusterSize > 1 && !miscellaneous.IsBackupBucketEmpty(d.Config.SnapstoreConfig, d.Logger) {
return DataDirStatusUnknownInMultiNode, nil
}
return status, err
}
if mode == Full {
d.Logger.Info("Checking for data directory files corruption...")
if err := d.checkForDataCorruption(); err != nil {
if errors.Is(err, bolt.ErrTimeout) {
d.Logger.Errorf("another etcd process is using %v and holds the file lock", d.backendPath())
return FailToOpenBoltDBError, err
}
d.Logger.Infof("Data directory corrupt. %v", err)
return DataDirectoryCorrupt, nil
}
}
d.Logger.Info("Data directory valid.")
return DataDirectoryValid, nil
}
func (d *DataValidator) sanityCheck(failBelowRevision int64) (DataDirStatus, error) {
mntDataDir := path.Dir(d.Config.DataDir)
path := mntDataDir + "/" + safeGuard
namespace := os.Getenv(podNamespace)
if namespace == "" {
d.Logger.Warn("POD_NAMESPACE environment variable is not set. The variable is used to safe guard against wrong volume mount")
} else {
// create the file `safe_guard` if it doesn't exist
if _, err := os.Stat(path); err != nil {
if errors.Is(err, os.ErrNotExist) {
data := []byte(namespace)
err := os.WriteFile(path, data, 0644)
if err != nil {
d.Logger.Fatalf("can't create `safe_guard` file because : %v", err)
}
} else {
d.Logger.Fatalf("can't check if the `safe_guard` file exists or not because : %v", err)
}
}
// read the content of the file safe_guard and match it with the environment variable
content, err := os.ReadFile(path)
if err != nil {
return WrongVolumeMounted, fmt.Errorf("can't read the content of the `safe_guard` file to determine if a wrong volume is mounted: %v", err)
}
if string(content) != namespace {
return WrongVolumeMounted, fmt.Errorf("wrong volume is mounted. The shoot name derived from namespace is %s and the content of `safe_guard` file at %s is %s", namespace, path, string(content))
}
}
dataDir := d.Config.DataDir
dirExists, err := directoryExist(dataDir)
if err != nil {
return DataDirectoryStatusUnknown, err
}
if !dirExists {
return DataDirectoryNotExist, fmt.Errorf("directory does not exist: %s", dataDir)
}
d.Logger.Info("Checking for data directory structure validity...")
etcdDirStructValid, err := d.hasEtcdDirectoryStructure()
if err != nil {
return DataDirectoryStatusUnknown, err
}
if !etcdDirStructValid {
d.Logger.Infof("Data directory structure invalid.")
return DataDirectoryInvStruct, nil
}
if d.Config.SnapstoreConfig == nil || len(d.Config.SnapstoreConfig.Provider) == 0 {
d.Logger.Info("Skipping check for revision consistency, since no snapstore configured.")
return DataDirectoryValid, nil
}
if d.OriginalClusterSize > 1 {
d.Logger.Info("Skipping check for revision consistency of etcd member as it will get in sync with etcd leader.")
return DataDirectoryValid, nil
}
etcdRevision, err := getLatestEtcdRevision(d.backendPath())
if err != nil && errors.Is(err, bolt.ErrTimeout) {
d.Logger.Errorf("another etcd process is using %v and holds the file lock", d.backendPath())
return FailToOpenBoltDBError, err
} else if err != nil {
d.Logger.Infof("unable to get current etcd revision from backend db file: %v", err)
return DataDirectoryCorrupt, nil
}
d.Logger.Info("Checking for etcd revision consistency...")
etcdRevisionStatus, latestSnapshotRevision, err := d.checkEtcdDataRevisionConsistency(etcdRevision, failBelowRevision)
// if etcd revision is inconsistent with latest snapshot revision then
// check the etcd revision consistency by starting an embedded etcd since the WALs file can have uncommited data which it was unable to flush to Bolt DB
if etcdRevisionStatus == RevisionConsistencyError {
d.Logger.Info("Checking for Full revision consistency...")
fullRevisionConsistencyStatus, err := d.checkFullRevisionConsistency(dataDir, latestSnapshotRevision)
return fullRevisionConsistencyStatus, err
}
return etcdRevisionStatus, err
}
// checkForDataCorruption will check for corruption of different files used by etcd.
func (d *DataValidator) checkForDataCorruption() error {
var walsnap walpb.Snapshot
d.Logger.Info("Verifying snap directory...")
snapshot, err := d.verifySnapDir()
if err != nil && err != snap.ErrNoSnapshot {
return fmt.Errorf("invalid snapshot files: %v", err)
}
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
d.Logger.Info("Verifying WAL directory...")
if err := verifyWALDir(d.ZapLogger, d.walDir(), walsnap); err != nil {
return fmt.Errorf("invalid wal files: %v", err)
}
d.Logger.Info("Verifying DB file...")
if err := verifyDB(d.backendPath()); err != nil {
if errors.Is(err, bolt.ErrTimeout) {
return err
}
return fmt.Errorf("invalid db files: %v", err)
}
return nil
}
// hasEtcdDirectoryStructure checks for existence of the required sub-directories.
func (d *DataValidator) hasEtcdDirectoryStructure() (bool, error) {
var memberExist, snapExist, walExist bool
var err error
if memberExist, err = directoryExist(d.memberDir()); err != nil {
return false, err
}
if snapExist, err = directoryExist(d.snapDir()); err != nil {
return false, err
}
if walExist, err = directoryExist(d.walDir()); err != nil {
return false, err
}
return memberExist && snapExist && walExist, nil
}
func directoryExist(dir string) (bool, error) {
if _, err := os.Stat(dir); os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, err
} else {
return true, nil
}
}
func (d *DataValidator) verifySnapDir() (*raftpb.Snapshot, error) {
ssr := snap.New(d.ZapLogger, d.snapDir())
return ssr.Load()
}
func verifyWALDir(logger *zap.Logger, waldir string, snap walpb.Snapshot) error {
var err error
repaired := false
for {
if err = wal.Verify(logger, waldir, snap); err != nil {
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
fmt.Printf("read wal error (%v) and cannot be repaired.\n", err)
return err
}
if !wal.Repair(logger, waldir) {
fmt.Printf("WAL error (%v) cannot be repaired.\n", err)
return err
}
fmt.Printf("repaired WAL error (%v).\n", err)
repaired = true
continue
}
break
}
return err
}
func verifyDB(path string) error {
if path == "" {
return ErrPathRequired
} else if _, err := os.Stat(path); os.IsNotExist(err) {
return ErrFileNotFound
}
// Open database.
db, err := bolt.Open(path, 0666, &bolt.Options{Timeout: timeoutToOpenBoltDB})
if err != nil {
return err
}
defer db.Close()
// Perform consistency check.
return db.View(func(tx *bolt.Tx) error {
var count int
for range tx.Check() {
count++
}
// Print summary of errors.
if count > 0 {
return ErrCorrupt
}
// Notify user that database is valid.
return nil
})
}
// checkEtcdDataRevisionConsistency compares the latest revision of the etcd db file and the latest snapshot revision to verify that the etcd revision is not lesser than snapshot revision.
// Return DataDirStatus indicating whether it is due to failBelowRevision or latest snapshot revision for snapstore and also return the latest snapshot revision.
func (d *DataValidator) checkEtcdDataRevisionConsistency(etcdRevision, failBelowRevision int64) (DataDirStatus, int64, error) {
var latestSnapshotRevision int64
latestSnapshotRevision = 0
store, err := snapstore.GetSnapstore(d.Config.SnapstoreConfig)
if err != nil {
return DataDirectoryStatusUnknown, latestSnapshotRevision, fmt.Errorf("unable to fetch snapstore: %v", err)
}
fullSnap, deltaSnaps, err := miscellaneous.GetLatestFullSnapshotAndDeltaSnapList(store)
if err != nil {
return DataDirectoryStatusUnknown, latestSnapshotRevision, fmt.Errorf("unable to get snapshots from store: %v", err)
}
if len(deltaSnaps) != 0 {
latestSnapshotRevision = deltaSnaps[len(deltaSnaps)-1].LastRevision
} else if fullSnap != nil {
latestSnapshotRevision = fullSnap.LastRevision
} else {
d.Logger.Infof("No snapshot found.")
if etcdRevision < failBelowRevision {
d.Logger.Infof("current etcd revision (%d) is less than fail below revision (%d): possible data loss", etcdRevision, failBelowRevision)
return FailBelowRevisionConsistencyError, latestSnapshotRevision, nil
}
return DataDirectoryValid, latestSnapshotRevision, nil
}
if etcdRevision < latestSnapshotRevision {
d.Logger.Infof("current etcd revision (%d) is less than latest snapshot revision (%d)", etcdRevision, latestSnapshotRevision)
return RevisionConsistencyError, latestSnapshotRevision, nil
}
return DataDirectoryValid, latestSnapshotRevision, nil
}
// checkFullRevisionConsistency starts an embedded etcd and then compares the latest revision of etcd db file and the latest snapshot revision to verify that the etcd revision is not lesser than snapshot revision.
// Return DataDirStatus indicating whether WALs file have uncommited data which it was unable to flush to DB or latest DB revision is still less than snapshot revision.
func (d *DataValidator) checkFullRevisionConsistency(dataDir string, latestSnapshotRevision int64) (DataDirStatus, error) {
var latestSyncedEtcdRevision int64
d.Logger.Info("Starting embedded etcd server...")
ro := &brtypes.RestoreOptions{
Config: &brtypes.RestorationConfig{
RestoreDataDir: dataDir,
EmbeddedEtcdQuotaBytes: d.Config.EmbeddedEtcdQuotaBytes,
MaxRequestBytes: defaultMaxRequestBytes,
MaxTxnOps: defaultMaxTxnOps,
},
}
e, err := miscellaneous.StartEmbeddedEtcd(logrus.NewEntry(d.Logger), ro)
if err != nil {
d.Logger.Infof("unable to start embedded etcd: %v", err)
return DataDirectoryCorrupt, err
}
defer func() {
e.Server.Stop()
e.Close()
}()
clientFactory := etcdutil.NewClientFactory(nil, brtypes.EtcdConnectionConfig{
Endpoints: []string{e.Clients[0].Addr().String()},
InsecureTransport: true,
})
clientKV, err := clientFactory.NewKV()
if err != nil {
d.Logger.Infof("unable to get the embedded etcd KV client: %v", err)
return DataDirectoryCorrupt, err
}
defer clientKV.Close()
timer := time.NewTimer(embeddedEtcdPingLimitDuration)
waitLoop:
for {
select {
case <-timer.C:
break waitLoop
default:
latestSyncedEtcdRevision, _ = getLatestSyncedRevision(clientKV)
if latestSyncedEtcdRevision >= latestSnapshotRevision {
d.Logger.Infof("After starting embeddedEtcd backend DB file revision (%d) is greater than or equal to latest snapshot revision (%d): no data loss", latestSyncedEtcdRevision, latestSnapshotRevision)
break waitLoop
}
time.Sleep(1 * time.Second)
}
}
defer timer.Stop()
if latestSyncedEtcdRevision < latestSnapshotRevision {
d.Logger.Infof("After starting embeddedEtcd backend DB file revision (%d) is less than latest snapshot revision (%d): possible data loss", latestSyncedEtcdRevision, latestSnapshotRevision)
return RevisionConsistencyError, nil
}
return DataDirectoryValid, nil
}
// getLatestEtcdRevision finds out the latest revision on the etcd db file without starting etcd server or an embedded etcd server.
func getLatestEtcdRevision(path string) (int64, error) {
if _, err := os.Stat(path); err != nil {
return -1, fmt.Errorf("unable to stat backend db file: %v", err)
}
db, err := bolt.Open(path, 0400, &bolt.Options{Timeout: timeoutToOpenBoltDB, ReadOnly: true})
if err != nil {
return -1, err
}
defer db.Close()
var rev int64
err = db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("key"))
if b == nil {
return fmt.Errorf("cannot get hash of bucket \"key\"")
}
c := b.Cursor()
k, _ := c.Last()
if len(k) < 8 {
rev = 1
return nil
}
rev = int64(binary.BigEndian.Uint64(k[0:8]))
return nil
})
if err != nil {
return -1, err
}
return rev, nil
}
// getLatestSyncedRevision finds out the latest revision on etcd db file when embedded etcd is started to double check the latest revision of etcd db file.
func getLatestSyncedRevision(client client.KVCloser) (int64, error) {
var latestSyncedRevision int64
ctx, cancel := context.WithTimeout(context.TODO(), connectionTimeout)
defer cancel()
resp, err := client.Get(ctx, "", clientv3.WithLastRev()...)
if err != nil {
fmt.Printf("Failed to get the latest etcd revision: %v\n", err)
return latestSyncedRevision, err
}
latestSyncedRevision = resp.Header.Revision
return latestSyncedRevision, nil
}