From 140bf5321d3b9b0c41d2ca024a4b0fa9a46c55e8 Mon Sep 17 00:00:00 2001 From: tangcong Date: Mon, 24 Feb 2020 17:17:50 +0800 Subject: [PATCH 1/7] *: fix auth revision corruption bug --- auth/store.go | 43 +++++++++++++++++++++++++- auth/store_test.go | 49 ++++++++++++++++++++++++++++++ etcdserver/backend.go | 2 +- etcdserver/server.go | 33 ++++++++++++-------- etcdserver/server_test.go | 8 ++--- mvcc/kv_test.go | 2 +- mvcc/watchable_store.go | 11 +++++-- mvcc/watchable_store_bench_test.go | 8 ++--- mvcc/watchable_store_test.go | 44 +++++++++++++++++++++++++++ mvcc/watcher_bench_test.go | 4 +++ mvcc/watcher_test.go | 12 ++++---- 11 files changed, 184 insertions(+), 32 deletions(-) diff --git a/auth/store.go b/auth/store.go index d676cb5553b..e99ddee51ac 100644 --- a/auth/store.go +++ b/auth/store.go @@ -90,6 +90,10 @@ type AuthenticateParamIndex struct{} // AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate() type AuthenticateParamSimpleTokenPrefix struct{} +// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex +type saveConsistentIndexFunc func(tx backend.BatchTx) + +// AuthStore defines auth storage interface. type AuthStore interface { // AuthEnable turns on the authentication feature AuthEnable() error @@ -178,6 +182,9 @@ type AuthStore interface { // HasRole checks that user has role HasRole(user, role string) bool + + // SetConsistentIndexSyncer sets consistentIndex syncer + SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) } type TokenProvider interface { @@ -200,9 +207,14 @@ type authStore struct { rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions - tokenProvider TokenProvider + tokenProvider TokenProvider + syncConsistentIndex saveConsistentIndexFunc + bcryptCost int // the algorithm cost / strength for hashing auth passwords } +func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) { + as.syncConsistentIndex = syncer +} func (as *authStore) AuthEnable() error { as.enabledMu.Lock() defer as.enabledMu.Unlock() @@ -252,6 +264,7 @@ func (as *authStore) AuthDisable() { tx.Lock() tx.UnsafePut(authBucketName, enableFlagKey, authDisabled) as.commitRevision(tx) + as.saveConsistentIndex(tx) tx.Unlock() b.ForceCommit() @@ -368,6 +381,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, putUser(tx, newUser) as.commitRevision(tx) + as.saveConsistentIndex(tx) plog.Noticef("added a new user: %s", r.Name) @@ -392,6 +406,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete delUser(tx, r.Name) as.commitRevision(tx) + as.saveConsistentIndex(tx) as.invalidateCachedPerm(r.Name) as.tokenProvider.invalidateUser(r.Name) @@ -428,6 +443,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p putUser(tx, updatedUser) as.commitRevision(tx) + as.saveConsistentIndex(tx) as.invalidateCachedPerm(r.Name) as.tokenProvider.invalidateUser(r.Name) @@ -468,6 +484,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser as.invalidateCachedPerm(r.User) as.commitRevision(tx) + as.saveConsistentIndex(tx) plog.Noticef("granted role %s to user %s", r.Role, r.User) return &pb.AuthUserGrantRoleResponse{}, nil @@ -536,6 +553,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs as.invalidateCachedPerm(r.Name) as.commitRevision(tx) + as.saveConsistentIndex(tx) plog.Noticef("revoked role %s from user %s", r.Role, r.Name) return &pb.AuthUserRevokeRoleResponse{}, nil @@ -600,6 +618,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) as.clearCachedPerm() as.commitRevision(tx) + as.saveConsistentIndex(tx) plog.Noticef("revoked key %s from role %s", r.Key, r.Role) return &pb.AuthRoleRevokePermissionResponse{}, nil @@ -645,6 +664,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete } as.commitRevision(tx) + as.saveConsistentIndex(tx) plog.Noticef("deleted role %s", r.Role) return &pb.AuthRoleDeleteResponse{}, nil @@ -667,6 +687,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, putRole(tx, newRole) as.commitRevision(tx) + as.saveConsistentIndex(tx) plog.Noticef("Role %s is created", r.Name) @@ -706,6 +727,16 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( }) if idx < len(role.KeyPermission) && bytes.Equal(role.KeyPermission[idx].Key, r.Perm.Key) && bytes.Equal(role.KeyPermission[idx].RangeEnd, r.Perm.RangeEnd) { + if role.KeyPermission[idx].PermType == r.Perm.PermType { + as.lg.Warn( + "ignored grant permission request to a role, existing permission", + zap.String("role-name", r.Name), + zap.ByteString("key", r.Perm.Key), + zap.ByteString("range-end", r.Perm.RangeEnd), + zap.String("permission-type", authpb.Permission_Type_name[int32(r.Perm.PermType)]), + ) + return &pb.AuthRoleGrantPermissionResponse{}, nil + } // update existing permission role.KeyPermission[idx].PermType = r.Perm.PermType } else { @@ -727,6 +758,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( as.clearCachedPerm() as.commitRevision(tx) + as.saveConsistentIndex(tx) plog.Noticef("role %s's permission of key %s is updated as %s", r.Name, r.Perm.Key, authpb.Permission_Type_name[int32(r.Perm.PermType)]) @@ -931,6 +963,7 @@ func NewAuthStore(be backend.Backend, tp TokenProvider) *authStore { if as.Revision() == 0 { as.commitRevision(tx) + as.saveConsistentIndex(tx) } tx.Unlock() @@ -1134,3 +1167,11 @@ func (as *authStore) HasRole(user, role string) bool { return false } + +func (as *authStore) saveConsistentIndex(tx backend.BatchTx) { + if as.syncConsistentIndex != nil { + as.syncConsistentIndex(tx) + } else { + plog.Errorf("failed to save consistentIndex,syncConsistentIndex is nil") + } +} diff --git a/auth/store_test.go b/auth/store_test.go index 155b2f07f52..43532c60e6b 100644 --- a/auth/store_test.go +++ b/auth/store_test.go @@ -301,6 +301,55 @@ func TestListUsers(t *testing.T) { } } +func TestRoleGrantPermissionRevision(t *testing.T) { + as, tearDown := setupAuthStore(t) + defer tearDown(t) + + _, err := as.RoleAdd(&pb.AuthRoleAddRequest{Name: "role-test-1"}) + if err != nil { + t.Fatal(err) + } + + perm := &authpb.Permission{ + PermType: authpb.WRITE, + Key: []byte("Keys"), + RangeEnd: []byte("RangeEnd"), + } + _, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{ + Name: "role-test-1", + Perm: perm, + }) + + if err != nil { + t.Fatal(err) + } + + r, err := as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"}) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(perm, r.Perm[0]) { + t.Errorf("expected %v, got %v", perm, r.Perm[0]) + } + + oldRevision := as.Revision() + + _, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{ + Name: "role-test-1", + Perm: perm, + }) + + if err != nil { + t.Error(err) + } + newRevision := as.Revision() + + if oldRevision != newRevision { + t.Errorf("expected revision diff is 0, got %d", newRevision-oldRevision) + } +} + func TestRoleGrantPermission(t *testing.T) { as, tearDown := setupAuthStore(t) defer tearDown(t) diff --git a/etcdserver/backend.go b/etcdserver/backend.go index 647773d474f..3481f37a810 100644 --- a/etcdserver/backend.go +++ b/etcdserver/backend.go @@ -71,7 +71,7 @@ func openBackend(cfg ServerConfig) backend.Backend { // case, replace the db with the snapshot db sent by the leader. func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) { var cIndex consistentIndex - kv := mvcc.New(oldbe, &lease.FakeLessor{}, &cIndex) + kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) defer kv.Close() if snapshot.Metadata.Index <= kv.ConsistentIndex() { return oldbe, nil diff --git a/etcdserver/server.go b/etcdserver/server.go index 083e9f72ac3..61cb91fbe08 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -447,8 +447,27 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds()))) - srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex) + srv.lessor = lease.NewLessor( + srv.getLogger(), + srv.be, + lease.LessorConfig{ + MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), + CheckpointInterval: cfg.LeaseCheckpointInterval, + ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), + }) + + tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, + func(index uint64) <-chan struct{} { + return srv.applyWait.Wait(index) + }, + ) + if err != nil { + plog.Warningf("failed to create token provider,err is %v", err) + return nil, err + } + srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost)) + + srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) if beExist { kvindex := srv.kv.ConsistentIndex() // TODO: remove kvindex != 0 checking when we do not expect users to upgrade @@ -470,16 +489,6 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { }() srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) - tp, err := auth.NewTokenProvider(cfg.AuthToken, - func(index uint64) <-chan struct{} { - return srv.applyWait.Wait(index) - }, - ) - if err != nil { - plog.Errorf("failed to create token provider: %s", err) - return nil, err - } - srv.authStore = auth.NewAuthStore(srv.be, tp) if num := cfg.AutoCompactionRetention; num != 0 { srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv) if err != nil { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index e3ea0f9250c..61965e801b1 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -916,7 +916,7 @@ func TestSnapshot(t *testing.T) { r: *r, store: st, } - srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{}) srv.be = be ch := make(chan struct{}, 2) @@ -994,7 +994,7 @@ func TestSnapshotOrdering(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() defer os.RemoveAll(tmpPath) - s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{}) s.be = be s.start() @@ -1052,7 +1052,7 @@ func TestTriggerSnap(t *testing.T) { } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} - srv.kv = mvcc.New(be, &lease.FakeLessor{}, &srv.consistIndex) + srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{}) srv.be = be srv.start() @@ -1121,7 +1121,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { defer func() { os.RemoveAll(tmpPath) }() - s.kv = mvcc.New(be, &lease.FakeLessor{}, &s.consistIndex) + s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{}) s.be = be s.start() diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index d6f49ee14a9..722108c78c2 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -710,7 +710,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 78df19326b9..2c332b36081 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -15,6 +15,7 @@ package mvcc import ( + "go.etcd.io/etcd/auth" "sync" "time" @@ -67,11 +68,11 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) ConsistentWatchableKV { - return newWatchableStore(b, le, ig) +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV { + return newWatchableStore(lg, b, le, as, ig, cfg) } -func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *watchableStore { +func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { s := &watchableStore{ store: NewStore(b, le, ig), victimc: make(chan struct{}, 1), @@ -85,6 +86,10 @@ func newWatchableStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGet // use this store as the deleter so revokes trigger watch events s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) } + if as != nil { + // TODO: encapsulating consistentindex into a separate package + as.SetConsistentIndexSyncer(s.store.saveIndex) + } s.wg.Add(2) go s.syncWatchersLoop() go s.syncVictimsLoop() diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 769d1bc38a8..053d841cd60 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -25,7 +25,7 @@ import ( func BenchmarkWatchableStorePut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := New(be, &lease.FakeLessor{}, nil) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -46,7 +46,7 @@ func BenchmarkWatchableStorePut(b *testing.B) { func BenchmarkWatchableStoreTxnPut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := New(be, &lease.FakeLessor{}, &i) + s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{}) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -67,7 +67,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) { // many synced watchers receiving a Put notification. func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(be, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer cleanup(s, be, tmpPath) k := []byte("testkey") @@ -162,7 +162,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(be, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index dc96d53366c..f6086079400 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -30,7 +30,11 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s := newWatchableStore(b, &lease.FakeLessor{}, nil) +======= + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer func() { s.store.Close() @@ -52,7 +56,11 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s := newWatchableStore(b, &lease.FakeLessor{}, nil) +======= + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer func() { s.store.Close() @@ -222,7 +230,11 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s := newWatchableStore(b, &lease.FakeLessor{}, nil) +======= + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer func() { s.store.Close() @@ -259,7 +271,11 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s := newWatchableStore(b, &lease.FakeLessor{}, nil) +======= + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer func() { s.store.Close() @@ -300,7 +316,11 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s := newWatchableStore(b, &lease.FakeLessor{}, nil) +======= + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer cleanup(s, b, tmpPath) testKey := []byte("foo") @@ -308,7 +328,11 @@ func TestWatchRestore(t *testing.T) { rev := s.Put(testKey, testValue, lease.NoLease) newBackend, newPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) +======= + newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() @@ -346,11 +370,19 @@ func TestWatchRestore(t *testing.T) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, b1Path := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil) defer cleanup(s1, b1, b1Path) b2, b2Path := backend.NewDefaultTmpBackend() s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil) +======= + s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + defer cleanup(s1, b1, b1Path) + + b2, b2Path := backend.NewDefaultTmpBackend() + s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer cleanup(s2, b2, b2Path) testKey, testValue := []byte("foo"), []byte("bar") @@ -397,7 +429,11 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s := newWatchableStore(b, &lease.FakeLessor{}, nil) +======= + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug oldMaxRevs := watchBatchMaxRevs defer func() { @@ -531,7 +567,11 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s := newWatchableStore(b, &lease.FakeLessor{}, nil) +======= + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer func() { s.store.Close() @@ -609,7 +649,11 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD s := newWatchableStore(b, &lease.FakeLessor{}, nil) +======= + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer func() { s.store.Close() diff --git a/mvcc/watcher_bench_test.go b/mvcc/watcher_bench_test.go index 8a4242f3f20..fa79ec97195 100644 --- a/mvcc/watcher_bench_test.go +++ b/mvcc/watcher_bench_test.go @@ -24,7 +24,11 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() +<<<<<<< HEAD watchable := newWatchableStore(be, &lease.FakeLessor{}, nil) +======= + watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) +>>>>>>> *: fix auth revision corruption bug defer cleanup(watchable, be, tmpPath) diff --git a/mvcc/watcher_test.go b/mvcc/watcher_test.go index 3d259d1f160..b415c682236 100644 --- a/mvcc/watcher_test.go +++ b/mvcc/watcher_test.go @@ -31,7 +31,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -83,7 +83,7 @@ func TestWatcherWatchID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -157,7 +157,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -177,7 +177,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(b, &lease.FakeLessor{}, nil) + s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) defer func() { s.store.Close() @@ -216,7 +216,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -308,7 +308,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) + s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() From 06a2f816e98db69882450083bbd2b5ffeabf9bda Mon Sep 17 00:00:00 2001 From: shawwang Date: Mon, 24 Feb 2020 15:47:45 +0800 Subject: [PATCH 2/7] auth: add new metric 'etcd_debugging_auth_revision' --- auth/metrics.go | 42 ++++++++++++++++++++++++++++++++++++++++++ auth/store.go | 10 ++++++++++ 2 files changed, 52 insertions(+) create mode 100644 auth/metrics.go diff --git a/auth/metrics.go b/auth/metrics.go new file mode 100644 index 00000000000..fe0d28e22d5 --- /dev/null +++ b/auth/metrics.go @@ -0,0 +1,42 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auth + +import ( + "github.com/prometheus/client_golang/prometheus" + "sync" +) + +var ( + currentAuthRevision = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "auth", + Name: "revision", + Help: "The current revision of auth store.", + }, + func() float64 { + reportCurrentAuthRevMu.RLock() + defer reportCurrentAuthRevMu.RUnlock() + return reportCurrentAuthRev() + }, + ) + // overridden by auth store initialization + reportCurrentAuthRevMu sync.RWMutex + reportCurrentAuthRev = func() float64 { return 0 } +) + +func init() { + prometheus.MustRegister(currentAuthRevision) +} diff --git a/auth/store.go b/auth/store.go index e99ddee51ac..02d18a6cef8 100644 --- a/auth/store.go +++ b/auth/store.go @@ -966,6 +966,8 @@ func NewAuthStore(be backend.Backend, tp TokenProvider) *authStore { as.saveConsistentIndex(tx) } + as.setupMetricsReporter() + tx.Unlock() be.ForceCommit() @@ -1175,3 +1177,11 @@ func (as *authStore) saveConsistentIndex(tx backend.BatchTx) { plog.Errorf("failed to save consistentIndex,syncConsistentIndex is nil") } } + +func (as *authStore) setupMetricsReporter() { + reportCurrentAuthRevMu.Lock() + reportCurrentAuthRev = func() float64 { + return float64(as.Revision()) + } + reportCurrentAuthRevMu.Unlock() +} From e7291a1dab7e585eccd2cebee14557d6409dbc50 Mon Sep 17 00:00:00 2001 From: tangcong Date: Tue, 3 Mar 2020 17:26:04 +0800 Subject: [PATCH 3/7] auth: print warning log when error is ErrAuthOldRevision --- auth/store.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/auth/store.go b/auth/store.go index 02d18a6cef8..44b04de069a 100644 --- a/auth/store.go +++ b/auth/store.go @@ -775,8 +775,13 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE if revision == 0 { return ErrUserEmpty } - - if revision < as.Revision() { + rev := as.Revision() + if revision < rev { + as.lg.Warn("request auth revision is less than current node auth revision", + zap.Uint64("current node auth revision", rev), + zap.Uint64("request auth revision", revision), + zap.ByteString("request key", key), + zap.Error(ErrAuthOldRevision)) return ErrAuthOldRevision } From acd94224599b75c2557efa2fab4ff142214d96fd Mon Sep 17 00:00:00 2001 From: tangcong Date: Tue, 3 Mar 2020 17:38:37 +0800 Subject: [PATCH 4/7] auth: cleanup saveConsistentIndex in NewAuthStore --- auth/store.go | 1 - 1 file changed, 1 deletion(-) diff --git a/auth/store.go b/auth/store.go index 44b04de069a..a3c46b18801 100644 --- a/auth/store.go +++ b/auth/store.go @@ -968,7 +968,6 @@ func NewAuthStore(be backend.Backend, tp TokenProvider) *authStore { if as.Revision() == 0 { as.commitRevision(tx) - as.saveConsistentIndex(tx) } as.setupMetricsReporter() From 27dffc6d018ac491616ea190e6e52f06659b40d5 Mon Sep 17 00:00:00 2001 From: tangcong Date: Tue, 10 Mar 2020 10:56:33 +0800 Subject: [PATCH 5/7] etcdserver: print warn log when failed to apply request --- etcdserver/apply.go | 3 +++ etcdserver/util.go | 17 ++++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 93e78e390c8..555338574a5 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -110,6 +110,9 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { ar := &applyResult{} defer func(start time.Time) { warnOfExpensiveRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) + if ar.err != nil { + warnOfFailedRequest(start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err) + } }(time.Now()) // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls diff --git a/etcdserver/util.go b/etcdserver/util.go index 79bb6b859ca..4f73e86a794 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -109,7 +109,22 @@ func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg pro warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err) } -func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { +func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) { + var resp string + if !isNil(respMsg) { + resp = fmt.Sprintf("size:%d", proto.Size(respMsg)) + } + d := time.Since(now) + plog.Warningf( + "failed to apply request", + zap.Duration("took", d), + zap.String("request", reqStringer.String()), + zap.String("response", resp), + zap.Error(err), + ) +} + +func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { reqStringer := pb.NewLoggableTxnRequest(r) var resp string if !isNil(txnResponse) { From 64fc4cc24485c2dc5c2d720d97c5982bfebe740a Mon Sep 17 00:00:00 2001 From: tangcong Date: Sun, 22 Mar 2020 12:04:19 +0800 Subject: [PATCH 6/7] auth: ensure RoleGrantPermission is compatible with older versions --- auth/store.go | 10 ---------- auth/store_test.go | 49 ---------------------------------------------- 2 files changed, 59 deletions(-) diff --git a/auth/store.go b/auth/store.go index a3c46b18801..d2fad6af4d4 100644 --- a/auth/store.go +++ b/auth/store.go @@ -727,16 +727,6 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( }) if idx < len(role.KeyPermission) && bytes.Equal(role.KeyPermission[idx].Key, r.Perm.Key) && bytes.Equal(role.KeyPermission[idx].RangeEnd, r.Perm.RangeEnd) { - if role.KeyPermission[idx].PermType == r.Perm.PermType { - as.lg.Warn( - "ignored grant permission request to a role, existing permission", - zap.String("role-name", r.Name), - zap.ByteString("key", r.Perm.Key), - zap.ByteString("range-end", r.Perm.RangeEnd), - zap.String("permission-type", authpb.Permission_Type_name[int32(r.Perm.PermType)]), - ) - return &pb.AuthRoleGrantPermissionResponse{}, nil - } // update existing permission role.KeyPermission[idx].PermType = r.Perm.PermType } else { diff --git a/auth/store_test.go b/auth/store_test.go index 43532c60e6b..155b2f07f52 100644 --- a/auth/store_test.go +++ b/auth/store_test.go @@ -301,55 +301,6 @@ func TestListUsers(t *testing.T) { } } -func TestRoleGrantPermissionRevision(t *testing.T) { - as, tearDown := setupAuthStore(t) - defer tearDown(t) - - _, err := as.RoleAdd(&pb.AuthRoleAddRequest{Name: "role-test-1"}) - if err != nil { - t.Fatal(err) - } - - perm := &authpb.Permission{ - PermType: authpb.WRITE, - Key: []byte("Keys"), - RangeEnd: []byte("RangeEnd"), - } - _, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{ - Name: "role-test-1", - Perm: perm, - }) - - if err != nil { - t.Fatal(err) - } - - r, err := as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"}) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(perm, r.Perm[0]) { - t.Errorf("expected %v, got %v", perm, r.Perm[0]) - } - - oldRevision := as.Revision() - - _, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{ - Name: "role-test-1", - Perm: perm, - }) - - if err != nil { - t.Error(err) - } - newRevision := as.Revision() - - if oldRevision != newRevision { - t.Errorf("expected revision diff is 0, got %d", newRevision-oldRevision) - } -} - func TestRoleGrantPermission(t *testing.T) { as, tearDown := setupAuthStore(t) defer tearDown(t) From 294e71448926e220dcf88168236c13af58d22b2e Mon Sep 17 00:00:00 2001 From: tangcong Date: Mon, 6 Apr 2020 10:47:14 +0800 Subject: [PATCH 7/7] *: fix cherry-pick conflict --- auth/store.go | 11 +++-- etcdserver/backend.go | 2 +- etcdserver/server.go | 17 +++----- etcdserver/server_test.go | 8 ++-- etcdserver/util.go | 12 ++---- integration/v3_watch_test.go | 8 ++-- mvcc/kv_test.go | 2 +- mvcc/watchable_store.go | 8 ++-- mvcc/watchable_store_bench_test.go | 8 ++-- mvcc/watchable_store_test.go | 66 +++++------------------------- mvcc/watcher_bench_test.go | 6 +-- mvcc/watcher_test.go | 12 +++--- 12 files changed, 50 insertions(+), 110 deletions(-) diff --git a/auth/store.go b/auth/store.go index d2fad6af4d4..21c121e50ae 100644 --- a/auth/store.go +++ b/auth/store.go @@ -209,7 +209,6 @@ type authStore struct { tokenProvider TokenProvider syncConsistentIndex saveConsistentIndexFunc - bcryptCost int // the algorithm cost / strength for hashing auth passwords } func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) { @@ -767,11 +766,11 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE } rev := as.Revision() if revision < rev { - as.lg.Warn("request auth revision is less than current node auth revision", - zap.Uint64("current node auth revision", rev), - zap.Uint64("request auth revision", revision), - zap.ByteString("request key", key), - zap.Error(ErrAuthOldRevision)) + plog.Warningf("request auth revision is less than current node auth revision,"+ + "current node auth revision is %d,"+ + "request auth revision is %d,"+ + "request key is %s, "+ + "err is %v", rev, revision, key, ErrAuthOldRevision) return ErrAuthOldRevision } diff --git a/etcdserver/backend.go b/etcdserver/backend.go index 3481f37a810..fe2f86503e0 100644 --- a/etcdserver/backend.go +++ b/etcdserver/backend.go @@ -71,7 +71,7 @@ func openBackend(cfg ServerConfig) backend.Backend { // case, replace the db with the snapshot db sent by the leader. func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) { var cIndex consistentIndex - kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + kv := mvcc.New(oldbe, &lease.FakeLessor{}, nil, &cIndex) defer kv.Close() if snapshot.Metadata.Index <= kv.ConsistentIndex() { return oldbe, nil diff --git a/etcdserver/server.go b/etcdserver/server.go index 61cb91fbe08..7c1948c5aa4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -447,16 +447,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor( - srv.getLogger(), - srv.be, - lease.LessorConfig{ - MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), - CheckpointInterval: cfg.LeaseCheckpointInterval, - ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), - }) - - tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken, + srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds()))) + + tp, err := auth.NewTokenProvider(cfg.AuthToken, func(index uint64) <-chan struct{} { return srv.applyWait.Wait(index) }, @@ -465,9 +458,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { plog.Warningf("failed to create token provider,err is %v", err) return nil, err } - srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost)) + srv.authStore = auth.NewAuthStore(srv.be, tp) - srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + srv.kv = mvcc.New(srv.be, srv.lessor, srv.authStore, &srv.consistIndex) if beExist { kvindex := srv.kv.ConsistentIndex() // TODO: remove kvindex != 0 checking when we do not expect users to upgrade diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 61965e801b1..7ab52b5367e 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -916,7 +916,7 @@ func TestSnapshot(t *testing.T) { r: *r, store: st, } - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{}) + srv.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &srv.consistIndex) srv.be = be ch := make(chan struct{}, 2) @@ -994,7 +994,7 @@ func TestSnapshotOrdering(t *testing.T) { be, tmpPath := backend.NewDefaultTmpBackend() defer os.RemoveAll(tmpPath) - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{}) + s.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &s.consistIndex) s.be = be s.start() @@ -1052,7 +1052,7 @@ func TestTriggerSnap(t *testing.T) { } srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster} - srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{}) + srv.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &srv.consistIndex) srv.be = be srv.start() @@ -1121,7 +1121,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { defer func() { os.RemoveAll(tmpPath) }() - s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{}) + s.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &s.consistIndex) s.be = be s.start() diff --git a/etcdserver/util.go b/etcdserver/util.go index 4f73e86a794..eb11d5d0de4 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -109,22 +109,16 @@ func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg pro warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err) } -func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) { +func warnOfFailedRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) { var resp string if !isNil(respMsg) { resp = fmt.Sprintf("size:%d", proto.Size(respMsg)) } d := time.Since(now) - plog.Warningf( - "failed to apply request", - zap.Duration("took", d), - zap.String("request", reqStringer.String()), - zap.String("response", resp), - zap.Error(err), - ) + plog.Warningf("failed to apply request,took %v,request %s,resp %s,err is %v", d, reqStringer.String(), resp, err) } -func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { +func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { reqStringer := pb.NewLoggableTxnRequest(r) var resp string if !isNil(txnResponse) { diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index c91f4df6503..44c288c4956 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -781,9 +781,11 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) { type eventsSortByKey []*mvccpb.Event -func (evs eventsSortByKey) Len() int { return len(evs) } -func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] } -func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 } +func (evs eventsSortByKey) Len() int { return len(evs) } +func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] } +func (evs eventsSortByKey) Less(i, j int) bool { + return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 +} func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { defer testutil.AfterTest(t) diff --git a/mvcc/kv_test.go b/mvcc/kv_test.go index 722108c78c2..bafaac81ee7 100644 --- a/mvcc/kv_test.go +++ b/mvcc/kv_test.go @@ -710,7 +710,7 @@ func TestKVSnapshot(t *testing.T) { func TestWatchableKVWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 2c332b36081..8f12bddba88 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -15,10 +15,10 @@ package mvcc import ( - "go.etcd.io/etcd/auth" "sync" "time" + "github.com/coreos/etcd/auth" "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" @@ -68,11 +68,11 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV { - return newWatchableStore(lg, b, le, as, ig, cfg) +func New(b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter) ConsistentWatchableKV { + return newWatchableStore(b, le, as, ig) } -func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore { +func newWatchableStore(b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter) *watchableStore { s := &watchableStore{ store: NewStore(b, le, ig), victimc: make(chan struct{}, 1), diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 053d841cd60..48b05c69eec 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -25,7 +25,7 @@ import ( func BenchmarkWatchableStorePut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := New(be, &lease.FakeLessor{}, nil, nil) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -46,7 +46,7 @@ func BenchmarkWatchableStorePut(b *testing.B) { func BenchmarkWatchableStoreTxnPut(b *testing.B) { var i fakeConsistentIndex be, tmpPath := backend.NewDefaultTmpBackend() - s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{}) + s := New(be, &lease.FakeLessor{}, nil, &i) defer cleanup(s, be, tmpPath) // arbitrary number of bytes @@ -67,7 +67,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) { // many synced watchers receiving a Put notification. func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(be, &lease.FakeLessor{}, nil, nil) defer cleanup(s, be, tmpPath) k := []byte("testkey") @@ -162,7 +162,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(be, &lease.FakeLessor{}, nil, nil) defer func() { s.store.Close() diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index f6086079400..b1133556ad8 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -30,11 +30,7 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s := newWatchableStore(b, &lease.FakeLessor{}, nil) -======= - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) defer func() { s.store.Close() @@ -56,11 +52,7 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s := newWatchableStore(b, &lease.FakeLessor{}, nil) -======= - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) defer func() { s.store.Close() @@ -230,11 +222,7 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s := newWatchableStore(b, &lease.FakeLessor{}, nil) -======= - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) defer func() { s.store.Close() @@ -271,11 +259,7 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s := newWatchableStore(b, &lease.FakeLessor{}, nil) -======= - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) defer func() { s.store.Close() @@ -316,11 +300,7 @@ func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s := newWatchableStore(b, &lease.FakeLessor{}, nil) -======= - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) defer cleanup(s, b, tmpPath) testKey := []byte("foo") @@ -328,11 +308,7 @@ func TestWatchRestore(t *testing.T) { rev := s.Put(testKey, testValue, lease.NoLease) newBackend, newPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil) -======= - newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil, nil) defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() @@ -370,19 +346,11 @@ func TestWatchRestore(t *testing.T) { // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { b1, b1Path := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil) + s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil, nil) defer cleanup(s1, b1, b1Path) b2, b2Path := backend.NewDefaultTmpBackend() - s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil) -======= - s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{}) - defer cleanup(s1, b1, b1Path) - - b2, b2Path := backend.NewDefaultTmpBackend() - s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil, nil) defer cleanup(s2, b2, b2Path) testKey, testValue := []byte("foo"), []byte("bar") @@ -429,11 +397,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s := newWatchableStore(b, &lease.FakeLessor{}, nil) -======= - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) oldMaxRevs := watchBatchMaxRevs defer func() { @@ -567,11 +531,7 @@ func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync b, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s := newWatchableStore(b, &lease.FakeLessor{}, nil) -======= - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) defer func() { s.store.Close() @@ -649,11 +609,7 @@ func TestWatchVictims(t *testing.T) { // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - s := newWatchableStore(b, &lease.FakeLessor{}, nil) -======= - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) defer func() { s.store.Close() diff --git a/mvcc/watcher_bench_test.go b/mvcc/watcher_bench_test.go index fa79ec97195..604b1c44094 100644 --- a/mvcc/watcher_bench_test.go +++ b/mvcc/watcher_bench_test.go @@ -24,11 +24,7 @@ import ( func BenchmarkKVWatcherMemoryUsage(b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() -<<<<<<< HEAD - watchable := newWatchableStore(be, &lease.FakeLessor{}, nil) -======= - watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{}) ->>>>>>> *: fix auth revision corruption bug + watchable := newWatchableStore(be, &lease.FakeLessor{}, nil, nil) defer cleanup(watchable, be, tmpPath) diff --git a/mvcc/watcher_test.go b/mvcc/watcher_test.go index b415c682236..078bfe5299d 100644 --- a/mvcc/watcher_test.go +++ b/mvcc/watcher_test.go @@ -31,7 +31,7 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -83,7 +83,7 @@ func TestWatcherWatchID(t *testing.T) { // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -157,7 +157,7 @@ func TestWatcherWatchPrefix(t *testing.T) { // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -177,7 +177,7 @@ func TestWatcherWatchWrongRange(t *testing.T) { func TestWatchDeleteRange(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}) + s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil) defer func() { s.store.Close() @@ -216,7 +216,7 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -308,7 +308,7 @@ func TestWatcherRequestProgress(t *testing.T) { func TestWatcherWatchWithFilter(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})) + s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil, nil)) defer cleanup(s, b, tmpPath) w := s.NewWatchStream()