Skip to content

Commit

Permalink
*: fix cherry-pick conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
tangcong committed Apr 6, 2020
1 parent 64fc4cc commit 294e714
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 110 deletions.
11 changes: 5 additions & 6 deletions auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ type authStore struct {

tokenProvider TokenProvider
syncConsistentIndex saveConsistentIndexFunc
bcryptCost int // the algorithm cost / strength for hashing auth passwords
}

func (as *authStore) SetConsistentIndexSyncer(syncer saveConsistentIndexFunc) {
Expand Down Expand Up @@ -767,11 +766,11 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE
}
rev := as.Revision()
if revision < rev {
as.lg.Warn("request auth revision is less than current node auth revision",
zap.Uint64("current node auth revision", rev),
zap.Uint64("request auth revision", revision),
zap.ByteString("request key", key),
zap.Error(ErrAuthOldRevision))
plog.Warningf("request auth revision is less than current node auth revision,"+
"current node auth revision is %d,"+
"request auth revision is %d,"+
"request key is %s, "+
"err is %v", rev, revision, key, ErrAuthOldRevision)
return ErrAuthOldRevision
}

Expand Down
2 changes: 1 addition & 1 deletion etcdserver/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func openBackend(cfg ServerConfig) backend.Backend {
// case, replace the db with the snapshot db sent by the leader.
func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
var cIndex consistentIndex
kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, nil, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kv := mvcc.New(oldbe, &lease.FakeLessor{}, nil, &cIndex)
defer kv.Close()
if snapshot.Metadata.Index <= kv.ConsistentIndex() {
return oldbe, nil
Expand Down
17 changes: 5 additions & 12 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,16 +447,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(
srv.getLogger(),
srv.be,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})

tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))

tp, err := auth.NewTokenProvider(cfg.AuthToken,
func(index uint64) <-chan struct{} {
return srv.applyWait.Wait(index)
},
Expand All @@ -465,9 +458,9 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
plog.Warningf("failed to create token provider,err is %v", err)
return nil, err
}
srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
srv.authStore = auth.NewAuthStore(srv.be, tp)

srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.authStore, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
srv.kv = mvcc.New(srv.be, srv.lessor, srv.authStore, &srv.consistIndex)
if beExist {
kvindex := srv.kv.ConsistentIndex()
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func TestSnapshot(t *testing.T) {
r: *r,
store: st,
}
srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &srv.consistIndex)
srv.be = be

ch := make(chan struct{}, 2)
Expand Down Expand Up @@ -994,7 +994,7 @@ func TestSnapshotOrdering(t *testing.T) {

be, tmpPath := backend.NewDefaultTmpBackend()
defer os.RemoveAll(tmpPath)
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &s.consistIndex)
s.be = be

s.start()
Expand Down Expand Up @@ -1052,7 +1052,7 @@ func TestTriggerSnap(t *testing.T) {
}
srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}

srv.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &srv.consistIndex, mvcc.StoreConfig{})
srv.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &srv.consistIndex)
srv.be = be

srv.start()
Expand Down Expand Up @@ -1121,7 +1121,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
defer func() {
os.RemoveAll(tmpPath)
}()
s.kv = mvcc.New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &s.consistIndex, mvcc.StoreConfig{})
s.kv = mvcc.New(be, &lease.FakeLessor{}, nil, &s.consistIndex)
s.be = be

s.start()
Expand Down
12 changes: 3 additions & 9 deletions etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,22 +109,16 @@ func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg pro
warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
}

func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
func warnOfFailedRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
d := time.Since(now)
plog.Warningf(
"failed to apply request",
zap.Duration("took", d),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
zap.Error(err),
)
plog.Warningf("failed to apply request,took %v,request %s,resp %s,err is %v", d, reqStringer.String(), resp, err)
}

func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
if !isNil(txnResponse) {
Expand Down
8 changes: 5 additions & 3 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,9 +781,11 @@ func testV3WatchMultipleEventsTxn(t *testing.T, startRev int64) {

type eventsSortByKey []*mvccpb.Event

func (evs eventsSortByKey) Len() int { return len(evs) }
func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }
func (evs eventsSortByKey) Len() int { return len(evs) }
func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
func (evs eventsSortByKey) Less(i, j int) bool {
return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0
}

func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
defer testutil.AfterTest(t)
Expand Down
2 changes: 1 addition & 1 deletion mvcc/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func TestKVSnapshot(t *testing.T) {

func TestWatchableKVWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{}))
s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil, nil))
defer cleanup(s, b, tmpPath)

w := s.NewWatchStream()
Expand Down
8 changes: 4 additions & 4 deletions mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
package mvcc

import (
"go.etcd.io/etcd/auth"
"sync"
"time"

"github.com/coreos/etcd/auth"
"github.com/coreos/etcd/lease"
"github.com/coreos/etcd/mvcc/backend"
"github.com/coreos/etcd/mvcc/mvccpb"
Expand Down Expand Up @@ -68,11 +68,11 @@ type watchableStore struct {
// cancel operations.
type cancelFunc func()

func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) ConsistentWatchableKV {
return newWatchableStore(lg, b, le, as, ig, cfg)
func New(b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter) ConsistentWatchableKV {
return newWatchableStore(b, le, as, ig)
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter, cfg StoreConfig) *watchableStore {
func newWatchableStore(b backend.Backend, le lease.Lessor, as auth.AuthStore, ig ConsistentIndexGetter) *watchableStore {
s := &watchableStore{
store: NewStore(b, le, ig),
victimc: make(chan struct{}, 1),
Expand Down
8 changes: 4 additions & 4 deletions mvcc/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

func BenchmarkWatchableStorePut(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := New(be, &lease.FakeLessor{}, nil, nil)
defer cleanup(s, be, tmpPath)

// arbitrary number of bytes
Expand All @@ -46,7 +46,7 @@ func BenchmarkWatchableStorePut(b *testing.B) {
func BenchmarkWatchableStoreTxnPut(b *testing.B) {
var i fakeConsistentIndex
be, tmpPath := backend.NewDefaultTmpBackend()
s := New(zap.NewExample(), be, &lease.FakeLessor{}, nil, &i, StoreConfig{})
s := New(be, &lease.FakeLessor{}, nil, &i)
defer cleanup(s, be, tmpPath)

// arbitrary number of bytes
Expand All @@ -67,7 +67,7 @@ func BenchmarkWatchableStoreTxnPut(b *testing.B) {
// many synced watchers receiving a Put notification.
func BenchmarkWatchableStoreWatchSyncPut(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(be, &lease.FakeLessor{}, nil, nil)
defer cleanup(s, be, tmpPath)

k := []byte("testkey")
Expand Down Expand Up @@ -162,7 +162,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {

func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
s := newWatchableStore(be, &lease.FakeLessor{}, nil, nil)

defer func() {
s.store.Close()
Expand Down
66 changes: 11 additions & 55 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ import (

func TestWatch(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil)

defer func() {
s.store.Close()
Expand All @@ -56,11 +52,7 @@ func TestWatch(t *testing.T) {

func TestNewWatcherCancel(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil)

defer func() {
s.store.Close()
Expand Down Expand Up @@ -230,11 +222,7 @@ func TestSyncWatchers(t *testing.T) {
// TestWatchCompacted tests a watcher that watches on a compacted revision.
func TestWatchCompacted(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil)

defer func() {
s.store.Close()
Expand Down Expand Up @@ -271,11 +259,7 @@ func TestWatchCompacted(t *testing.T) {

func TestWatchFutureRev(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil)

defer func() {
s.store.Close()
Expand Down Expand Up @@ -316,23 +300,15 @@ func TestWatchRestore(t *testing.T) {
test := func(delay time.Duration) func(t *testing.T) {
return func(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil)
defer cleanup(s, b, tmpPath)

testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)

newBackend, newPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
=======
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil, nil)
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
Expand Down Expand Up @@ -370,19 +346,11 @@ func TestWatchRestore(t *testing.T) {
// 5. choose the watcher from step 1, without panic
func TestWatchRestoreSyncedWatcher(t *testing.T) {
b1, b1Path := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil)
s1 := newWatchableStore(b1, &lease.FakeLessor{}, nil, nil)
defer cleanup(s1, b1, b1Path)

b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil)
=======
s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(s1, b1, b1Path)

b2, b2Path := backend.NewDefaultTmpBackend()
s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s2 := newWatchableStore(b2, &lease.FakeLessor{}, nil, nil)
defer cleanup(s2, b2, b2Path)

testKey, testValue := []byte("foo"), []byte("bar")
Expand Down Expand Up @@ -429,11 +397,7 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) {
// TestWatchBatchUnsynced tests batching on unsynced watchers
func TestWatchBatchUnsynced(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil)

oldMaxRevs := watchBatchMaxRevs
defer func() {
Expand Down Expand Up @@ -567,11 +531,7 @@ func TestWatchVictims(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync

b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil)

defer func() {
s.store.Close()
Expand Down Expand Up @@ -649,11 +609,7 @@ func TestWatchVictims(t *testing.T) {
// canceling its watches.
func TestStressWatchCancelClose(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
=======
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
s := newWatchableStore(b, &lease.FakeLessor{}, nil, nil)

defer func() {
s.store.Close()
Expand Down
6 changes: 1 addition & 5 deletions mvcc/watcher_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ import (

func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
<<<<<<< HEAD
watchable := newWatchableStore(be, &lease.FakeLessor{}, nil)
=======
watchable := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil, nil, StoreConfig{})
>>>>>>> *: fix auth revision corruption bug
watchable := newWatchableStore(be, &lease.FakeLessor{}, nil, nil)

defer cleanup(watchable, be, tmpPath)

Expand Down
Loading

0 comments on commit 294e714

Please sign in to comment.