diff --git a/auth/metrics.go b/auth/metrics.go new file mode 100644 index 00000000000..fe0d28e22d5 --- /dev/null +++ b/auth/metrics.go @@ -0,0 +1,42 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auth + +import ( + "github.com/prometheus/client_golang/prometheus" + "sync" +) + +var ( + currentAuthRevision = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "auth", + Name: "revision", + Help: "The current revision of auth store.", + }, + func() float64 { + reportCurrentAuthRevMu.RLock() + defer reportCurrentAuthRevMu.RUnlock() + return reportCurrentAuthRev() + }, + ) + // overridden by auth store initialization + reportCurrentAuthRevMu sync.RWMutex + reportCurrentAuthRev = func() float64 { return 0 } +) + +func init() { + prometheus.MustRegister(currentAuthRevision) +} diff --git a/auth/store.go b/auth/store.go index 3c153622934..02a345449b5 100644 --- a/auth/store.go +++ b/auth/store.go @@ -94,6 +94,9 @@ type AuthenticateParamIndex struct{} // AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate() type AuthenticateParamSimpleTokenPrefix struct{} +// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex +type saveConsistentIndexFunc func(tx backend.BatchTx) + // AuthStore defines auth storage interface. type AuthStore interface { // AuthEnable turns on the authentication feature @@ -186,6 +189,9 @@ type AuthStore interface { // HasRole checks that user has role HasRole(user, role string) bool + + // SetConsistentIndexSyncer sets consistentIndex syncer + SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) } type TokenProvider interface { @@ -209,10 +215,14 @@ type authStore struct { rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions - tokenProvider TokenProvider - bcryptCost int // the algorithm cost / strength for hashing auth passwords + tokenProvider TokenProvider + syncConsistentIndex saveConsistentIndexFunc + bcryptCost int // the algorithm cost / strength for hashing auth passwords } +func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) { + as.syncConsistentIndex = syncer +} func (as *authStore) AuthEnable() error { as.enabledMu.Lock() defer as.enabledMu.Unlock() @@ -269,6 +279,7 @@ func (as *authStore) AuthDisable() { tx.Lock() tx.UnsafePut(authBucketName, enableFlagKey, authDisabled) as.commitRevision(tx) + as.saveConsistentIndex(tx) tx.Unlock() b.ForceCommit() @@ -430,6 +441,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, putUser(as.lg, tx, newUser) as.commitRevision(tx) + as.saveConsistentIndex(tx) if as.lg != nil { as.lg.Info("added a user", zap.String("user-name", r.Name)) @@ -461,6 +473,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete delUser(tx, r.Name) as.commitRevision(tx) + as.saveConsistentIndex(tx) as.invalidateCachedPerm(r.Name) as.tokenProvider.invalidateUser(r.Name) @@ -513,6 +526,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p putUser(as.lg, tx, updatedUser) as.commitRevision(tx) + as.saveConsistentIndex(tx) as.invalidateCachedPerm(r.Name) as.tokenProvider.invalidateUser(r.Name) @@ -569,6 +583,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser as.invalidateCachedPerm(r.User) as.commitRevision(tx) + as.saveConsistentIndex(tx) if as.lg != nil { as.lg.Info( @@ -655,6 +670,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs as.invalidateCachedPerm(r.Name) as.commitRevision(tx) + as.saveConsistentIndex(tx) if as.lg != nil { as.lg.Info( @@ -729,6 +745,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) as.clearCachedPerm() as.commitRevision(tx) + as.saveConsistentIndex(tx) if as.lg != nil { as.lg.Info( @@ -788,6 +805,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete } as.commitRevision(tx) + as.saveConsistentIndex(tx) if as.lg != nil { as.lg.Info("deleted a role", zap.String("role-name", r.Role)) @@ -818,6 +836,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, putRole(as.lg, tx, newRole) as.commitRevision(tx) + as.saveConsistentIndex(tx) if as.lg != nil { as.lg.Info("created a role", zap.String("role-name", r.Name)) @@ -881,6 +900,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( as.clearCachedPerm() as.commitRevision(tx) + as.saveConsistentIndex(tx) if as.lg != nil { as.lg.Info( @@ -904,8 +924,21 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE if revision == 0 { return ErrUserEmpty } - - if revision < as.Revision() { + rev := as.Revision() + if revision < rev { + if as.lg != nil { + as.lg.Warn("request auth revision is less than current node auth revision", + zap.Uint64("current node auth revision", rev), + zap.Uint64("request auth revision", revision), + zap.ByteString("request key", key), + zap.Error(ErrAuthOldRevision)) + } else { + plog.Warningf("request auth revision is less than current node auth revision,"+ + "current node auth revision is %d,"+ + "request auth revision is %d,"+ + "request key is %s, "+ + "err is %v", rev, revision, key, ErrAuthOldRevision) + } return ErrAuthOldRevision } @@ -1145,6 +1178,8 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo as.commitRevision(tx) } + as.setupMetricsReporter() + tx.Unlock() be.ForceCommit() @@ -1419,3 +1454,23 @@ func (as *authStore) HasRole(user, role string) bool { func (as *authStore) BcryptCost() int { return as.bcryptCost } + +func (as *authStore) saveConsistentIndex(tx backend.BatchTx) { + if as.syncConsistentIndex != nil { + as.syncConsistentIndex(tx) + } else { + if as.lg != nil { + as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil") + } else { + plog.Error("failed to save consistentIndex,syncConsistentIndex is nil") + } + } +} + +func (as *authStore) setupMetricsReporter() { + reportCurrentAuthRevMu.Lock() + reportCurrentAuthRev = func() float64 { + return float64(as.Revision()) + } + reportCurrentAuthRevMu.Unlock() +} diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 822b5e32204..d98549dea5b 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -116,6 +116,9 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { ar := &applyResult{} defer func(start time.Time) { warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) + if ar.err != nil { + warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) + } }(time.Now()) // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls diff --git a/etcdserver/backend.go b/etcdserver/backend.go index 01ba1925686..3eace1a33c6 100644 --- a/etcdserver/backend.go +++ b/etcdserver/backend.go @@ -102,7 +102,7 @@ func openBackend(cfg ServerConfig) backend.Backend { // case, replace the db with the snapshot db sent by the leader. func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) { var cIndex consistentIndex - kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) defer kv.Close() if snapshot.Metadata.Index <= kv.ConsistentIndex() { return oldbe, nil diff --git a/etcdserver/server.go b/etcdserver/server.go index 4db1998d6fc..07c3e827068 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -540,7 +540,23 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { CheckpointInterval: cfg.LeaseCheckpointInterval, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), }) - srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + + tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, + func(index uint64) <-chan struct{} { + return srv.applyWait.Wait(index) + }, + ) + if err != nil { + if cfg.Logger != nil { + cfg.Logger.Warn("failed to create token provider", zap.Error(err)) + } else { + plog.Warningf("failed to create token provider,err is %v", err) + } + return nil, err + } + srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost)) + + srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) if beExist { kvindex := srv.kv.ConsistentIndex() // TODO: remove kvindex != 0 checking when we do not expect users to upgrade @@ -569,20 +585,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { }() srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) - tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, - func(index uint64) <-chan struct{} { - return srv.applyWait.Wait(index) - }, - ) - if err != nil { - if cfg.Logger != nil { - cfg.Logger.Warn("failed to create token provider", zap.Error(err)) - } else { - plog.Errorf("failed to create token provider: %s", err) - } - return nil, err - } - srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost)) if num := cfg.AutoCompactionRetention; num != 0 { srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv) if err != nil { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index c6447102212..784deeb672c 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -984,7 +984,7 @@ func TestSnapshot(t *testing.T) { r: *r, v2store: st, } - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{}) srv.be = be ch := make(chan struct{}, 2) @@ -1065,7 +1065,7 @@ func TestSnapshotOrdering(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() defer os.RemoveAll(tmpPath) - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{}) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{}) s.be = be s.start() @@ -1126,7 +1126,7 @@ func TestTriggerSnap(t *testing.T) { } srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster} - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{}) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{}) srv.be = be srv.start() @@ -1198,7 +1198,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { defer func() { os.RemoveAll(tmpPath) }() - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{}) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{}) s.be = be s.start() diff --git a/etcdserver/util.go b/etcdserver/util.go index fe5024ef00d..ece1c2ce086 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -111,6 +111,25 @@ func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Strin warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err) } +func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) { + var resp string + if !isNil(respMsg) { + resp = fmt.Sprintf("size:%d", proto.Size(respMsg)) + } + d := time.Since(now) + if lg != nil { + lg.Warn( + "failed to apply request", + zap.Duration("took", d), + zap.String("request", reqStringer.String()), + zap.String("response", resp), + zap.Error(err), + ) + } else { + plog.Warningf("failed to apply request %q with response %q took (%v) to execute, err is %v", reqStringer.String(), resp, d, err) + } +} + func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { reqStringer := pb.NewLoggableTxnRequest(r) var resp string diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 466040790ff..06f82636b63 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -712,7 +712,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index a51e5aa529b..72c6b8be4ba 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -15,6 +15,7 @@ package mvcc import ( + "go.etcd.io/etcd/auth" "sync" "time" @@ -69,11 +70,11 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV { - return newWatchableStore(lg, b, le, ig, cfg) +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV { + return newWatchableStore(lg, b, le, as, ig, cfg) } -func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { s := &watchableStore{ store: NewStore(lg, b, le, ig, cfg), victimc: make(chan struct{}, 1), @@ -87,6 +88,10 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co // use this store as the deleter so revokes trigger watch events s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } + if as != nil { + // TODO: encapsulating consistentindex into a separate package + as.SetConsistentIndexSyncer(s.store.saveIndex) + } s.wg.Add(2) go s.syncWatchersLoop() go s.syncVictimsLoop() diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 0f553493fa0..1b0581b9820 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -28,7 +28,7 @@ import ( func BenchmarkWatchableStorePut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -49,7 +49,7 @@ func BenchmarkWatchableStorePut(b *testing.B) { func BenchmarkWatchableStoreTxnPut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := New(zap.NewExample(), be, &lease.FakeLessor{}, &i, StoreConfig{}) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -80,7 +80,7 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) { func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(s, be, tmpPath) k := []byte("testkey") @@ -180,7 +180,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index e4d0cd62ec5..67d0611f2cf 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -32,7 +32,7 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() @@ -54,7 +54,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() @@ -224,7 +224,7 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() @@ -261,7 +261,7 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() @@ -302,7 +302,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(s, b, tmpPath) testKey := []byte("foo") @@ -310,7 +310,7 @@ func TestWatchRestore(t *testing.T) { rev := s.Put(testKey, testValue, lease.NoLease) newBackend, newPath := backend.NewDefaultTmpBackend() - newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, StoreConfig{}) + newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() @@ -348,11 +348,11 @@ func TestWatchRestore(t *testing.T) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, b1Path := backend.NewDefaultTmpBackend() - s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, StoreConfig{}) + s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(s1, b1, b1Path) b2, b2Path := backend.NewDefaultTmpBackend() - s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, StoreConfig{}) + s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(s2, b2, b2Path) testKey, testValue := []byte("foo"), []byte("bar") @@ -399,7 +399,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) oldMaxRevs := watchBatchMaxRevs defer func() { @@ -533,7 +533,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() @@ -611,7 +611,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() diff --git a/mvcc/watcher_bench_test.go b/mvcc/watcher_bench_test.go index 901a1ec0d83..f2e6c6f6e96 100644 --- a/mvcc/watcher_bench_test.go +++ b/mvcc/watcher_bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, StoreConfig{}) + watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(watchable, be, tmpPath) diff --git a/mvcc/watcher_test.go b/mvcc/watcher_test.go index f3bc9e15954..515d779cfbd 100644 --- a/mvcc/watcher_test.go +++ b/mvcc/watcher_test.go @@ -32,7 +32,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -82,7 +82,7 @@ func TestWatcherWatchID(t *testing.T) { func TestWatcherRequestsCustomID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -119,7 +119,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -193,7 +193,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -213,7 +213,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() @@ -252,7 +252,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -344,7 +344,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/tools/etcd-dump-db/backend.go b/tools/etcd-dump-db/backend.go index 4ad0aa5ddf2..aa2102c4159 100644 --- a/tools/etcd-dump-db/backend.go +++ b/tools/etcd-dump-db/backend.go @@ -17,6 +17,7 @@ package main import ( "encoding/binary" "fmt" + "go.etcd.io/etcd/auth/authpb" "path/filepath" "go.etcd.io/etcd/lease/leasepb" @@ -52,8 +53,11 @@ func getBuckets(dbPath string) (buckets []string, err error) { type decoder func(k, v []byte) var decoders = map[string]decoder{ - "key": keyDecoder, - "lease": leaseDecoder, + "key": keyDecoder, + "lease": leaseDecoder, + "auth": authDecoder, + "authRoles": authRolesDecoder, + "authUsers": authUsersDecoder, } type revision struct { @@ -93,6 +97,33 @@ func leaseDecoder(k, v []byte) { fmt.Printf("lease ID=%016x, TTL=%ds\n", leaseID, lpb.TTL) } +func authDecoder(k, v []byte) { + if string(k) == "authRevision" { + rev := binary.BigEndian.Uint64(v) + fmt.Printf("key=%q, value=%v\n", k, rev) + } else { + fmt.Printf("key=%q, value=%v\n", k, v) + } +} + +func authRolesDecoder(k, v []byte) { + role := &authpb.Role{} + err := role.Unmarshal(v) + if err != nil { + panic(err) + } + fmt.Printf("role=%q, keyPermission=%v\n", string(role.Name), role.KeyPermission) +} + +func authUsersDecoder(k, v []byte) { + user := &authpb.User{} + err := user.Unmarshal(v) + if err != nil { + panic(err) + } + fmt.Printf("user=%q, roles=%q, password=%q, option=%v\n", user.Name, user.Roles, string(user.Password), user.Options) +} + func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error) { db, err := bolt.Open(dbPath, 0600, &bolt.Options{Timeout: flockTimeout}) if err != nil {