Skip to content

Commit

Permalink
Merge pull request #11652 from tangcong/fix-auth-store-corruption-bug
Browse files Browse the repository at this point in the history
auth/store: save consistentIndex to fix a data corruption bug
  • Loading branch information
jingyih authored Mar 1, 2020
2 parents d774916 + f14d2a0 commit cb63341
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 47 deletions.
42 changes: 42 additions & 0 deletions auth/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"github.com/prometheus/client_golang/prometheus"
"sync"
)

var (
currentAuthRevision = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd_debugging",
Subsystem: "auth",
Name: "revision",
Help: "The current revision of auth store.",
},
func() float64 {
reportCurrentAuthRevMu.RLock()
defer reportCurrentAuthRevMu.RUnlock()
return reportCurrentAuthRev()
},
)
// overridden by auth store initialization
reportCurrentAuthRevMu sync.RWMutex
reportCurrentAuthRev = func() float64 { return 0 }
)

func init() {
prometheus.MustRegister(currentAuthRevision)
}
53 changes: 51 additions & 2 deletions auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ type AuthenticateParamIndex struct{}
// AuthenticateParamSimpleTokenPrefix is used for a key of context in the parameters of Authenticate()
type AuthenticateParamSimpleTokenPrefix struct{}

// saveConsistentIndexFunc is used to sync consistentIndex to backend, now reusing store.saveIndex
type saveConsistentIndexFunc func(tx backend.BatchTx)

// AuthStore defines auth storage interface.
type AuthStore interface {
// AuthEnable turns on the authentication feature
Expand Down Expand Up @@ -183,6 +186,9 @@ type AuthStore interface {

// HasRole checks that user has role
HasRole(user, role string) bool

// SetConsistentIndexSyncer sets consistentIndex syncer
SetConsistentIndexSyncer(syncer saveConsistentIndexFunc)
}

type TokenProvider interface {
Expand All @@ -206,10 +212,14 @@ type authStore struct {

rangePermCache map[string]*unifiedRangePermissions // username -> unifiedRangePermissions

tokenProvider TokenProvider
bcryptCost int // the algorithm cost / strength for hashing auth passwords
tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
bcryptCost int // the algorithm cost / strength for hashing auth passwords
}

func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
as.syncConsistentIndex = syncer
}
func (as *authStore) AuthEnable() error {
as.enabledMu.Lock()
defer as.enabledMu.Unlock()
Expand Down Expand Up @@ -258,6 +268,7 @@ func (as *authStore) AuthDisable() {
tx.Lock()
tx.UnsafePut(authBucketName, enableFlagKey, authDisabled)
as.commitRevision(tx)
as.saveConsistentIndex(tx)
tx.Unlock()
b.ForceCommit()

Expand Down Expand Up @@ -403,6 +414,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse,
putUser(as.lg, tx, newUser)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info("added a user", zap.String("user-name", r.Name))
return &pb.AuthUserAddResponse{}, nil
Expand All @@ -426,6 +438,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete
delUser(tx, r.Name)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
Expand Down Expand Up @@ -470,6 +483,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p
putUser(as.lg, tx, updatedUser)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.invalidateCachedPerm(r.Name)
as.tokenProvider.invalidateUser(r.Name)
Expand Down Expand Up @@ -518,6 +532,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser
as.invalidateCachedPerm(r.User)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info(
"granted a role to a user",
Expand Down Expand Up @@ -596,6 +611,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs
as.invalidateCachedPerm(r.Name)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info(
"revoked a role from a user",
Expand Down Expand Up @@ -666,6 +682,7 @@ func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest)
as.clearCachedPerm()

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info(
"revoked a permission on range",
Expand Down Expand Up @@ -717,6 +734,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete
}

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info("deleted a role", zap.String("role-name", r.Role))
return &pb.AuthRoleDeleteResponse{}, nil
Expand All @@ -743,6 +761,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse,
putRole(as.lg, tx, newRole)

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info("created a role", zap.String("role-name", r.Name))
return &pb.AuthRoleAddResponse{}, nil
Expand Down Expand Up @@ -781,6 +800,16 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
})

if idx < len(role.KeyPermission) && bytes.Equal(role.KeyPermission[idx].Key, r.Perm.Key) && bytes.Equal(role.KeyPermission[idx].RangeEnd, r.Perm.RangeEnd) {
if role.KeyPermission[idx].PermType == r.Perm.PermType {
as.lg.Warn(
"ignored grant permission request to a role, existing permission",
zap.String("role-name", r.Name),
zap.ByteString("key", r.Perm.Key),
zap.ByteString("range-end", r.Perm.RangeEnd),
zap.String("permission-type", authpb.Permission_Type_name[int32(r.Perm.PermType)]),
)
return &pb.AuthRoleGrantPermissionResponse{}, nil
}
// update existing permission
role.KeyPermission[idx].PermType = r.Perm.PermType
} else {
Expand All @@ -802,6 +831,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (
as.clearCachedPerm()

as.commitRevision(tx)
as.saveConsistentIndex(tx)

as.lg.Info(
"granted/updated a permission to a user",
Expand Down Expand Up @@ -1035,8 +1065,11 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo

if as.Revision() == 0 {
as.commitRevision(tx)
as.saveConsistentIndex(tx)
}

as.setupMetricsReporter()

tx.Unlock()
be.ForceCommit()

Expand Down Expand Up @@ -1279,3 +1312,19 @@ func (as *authStore) HasRole(user, role string) bool {
func (as *authStore) BcryptCost() int {
return as.bcryptCost
}

func (as *authStore) saveConsistentIndex(tx backend.BatchTx) {
if as.syncConsistentIndex != nil {
as.syncConsistentIndex(tx)
} else {
as.lg.Error("failed to save consistentIndex,syncConsistentIndex is nil")
}
}

func (as *authStore) setupMetricsReporter() {
reportCurrentAuthRevMu.Lock()
reportCurrentAuthRev = func() float64 {
return float64(as.Revision())
}
reportCurrentAuthRevMu.Unlock()
}
49 changes: 49 additions & 0 deletions auth/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,55 @@ func TestListUsers(t *testing.T) {
}
}

func TestRoleGrantPermissionRevision(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer tearDown(t)

_, err := as.RoleAdd(&pb.AuthRoleAddRequest{Name: "role-test-1"})
if err != nil {
t.Fatal(err)
}

perm := &authpb.Permission{
PermType: authpb.WRITE,
Key: []byte("Keys"),
RangeEnd: []byte("RangeEnd"),
}
_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
Name: "role-test-1",
Perm: perm,
})

if err != nil {
t.Fatal(err)
}

r, err := as.RoleGet(&pb.AuthRoleGetRequest{Role: "role-test-1"})
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(perm, r.Perm[0]) {
t.Errorf("expected %v, got %v", perm, r.Perm[0])
}

oldRevision := as.Revision()

_, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{
Name: "role-test-1",
Perm: perm,
})

if err != nil {
t.Error(err)
}
newRevision := as.Revision()

if oldRevision != newRevision {
t.Errorf("expected revision diff is 0, got %d", newRevision-oldRevision)
}
}

func TestRoleGrantPermission(t *testing.T) {
as, tearDown := setupAuthStore(t)
defer tearDown(t)
Expand Down
2 changes: 1 addition & 1 deletion etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
Expand Down
25 changes: 14 additions & 11 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,19 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})

tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
)
if err != nil {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))

srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
Expand All @@ -539,16 +551,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
}()

srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
)
if err != nil {
cfg.Logger.Warn("failed to create token provider", zap.Error(err))
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))

if num := cfg.AutoCompactionRetention; num != 0 {
srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ func TestSnapshot(t *testing.T) {
r: *r,
v2store: st,
}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be

ch := make(chan struct{}, 2)
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func TestSnapshotOrdering(t *testing.T) {

be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.be = be

s.start()
Expand Down Expand Up @@ -1125,7 +1125,7 @@ func TestTriggerSnap(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}

srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.be = be

srv.start()
Expand Down Expand Up @@ -1197,7 +1197,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, &s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.be = be

s.start()
Expand Down
2 changes: 1 addition & 1 deletion mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
defer cleanup(s, b, tmpPath)

w := s.NewWatchStream()
Expand Down
11 changes: 8 additions & 3 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package mvcc

import (
"go.etcd.io/etcd/auth"
"sync"
"time"

Expand Down Expand Up @@ -69,11 +70,11 @@ type watchableStore struct {
// cancel operations.
type cancelFunc func()

func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, ig, cfg)
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, as, ig, cfg)
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
if lg == nil {
lg = zap.NewNop()
}
Expand All @@ -90,6 +91,10 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co
// use this store as the deleter so revokes trigger watch events
s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
}
if as != nil {
// TODO: encapsulating consistentindex into a separate package
as.SetConsistentIndexSyncer(s.store.saveIndex)
}
s.wg.Add(2)
go s.syncWatchersLoop()
go s.syncVictimsLoop()
Expand Down
Loading

0 comments on commit cb63341

Please sign in to comment.