From d7c3bd268442191d091424f581edb96926cd86c7 Mon Sep 17 00:00:00 2001 From: xrmzju Date: Wed, 16 Sep 2020 17:19:00 +0800 Subject: [PATCH] mvcc: prune revisions before send backend readTx when rangekeys --- etcdserver/apply.go | 36 ++++++++++----------- mvcc/kv.go | 18 +++++++++-- mvcc/kvstore_txn.go | 65 ++++++++++++++++++++++++++++++++++++-- mvcc/kvstore_txn_test.go | 68 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+), 26 deletions(-) create mode 100644 mvcc/kvstore_txn_test.go diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 201486a71af..3d73f949633 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -299,8 +299,8 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra } limit := r.Limit - if r.SortOrder != pb.RangeRequest_NONE || - r.MinModRevision != 0 || r.MaxModRevision != 0 || + if (r.SortOrder != pb.RangeRequest_NONE && r.SortTarget != pb.RangeRequest_MOD) || + //r.MinModRevision != 0 || r.MaxModRevision != 0 || r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 { // fetch everything; sort and truncate afterwards limit = 0 @@ -311,9 +311,13 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra } ro := mvcc.RangeOptions{ - Limit: limit, - Rev: r.Revision, - Count: r.CountOnly, + Limit: limit, + Rev: r.Revision, + Count: r.CountOnly, + MaxModRevision: r.MaxModRevision, + MinModRevision: r.MinModRevision, + SortByModRevision: r.SortTarget == pb.RangeRequest_MOD, + SortOrder: mvcc.SortOrder(r.SortOrder), } rr, err := txn.Range(r.Key, mkGteRange(r.RangeEnd), ro) @@ -321,14 +325,6 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra return nil, err } - if r.MaxModRevision != 0 { - f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision } - pruneKVs(rr, f) - } - if r.MinModRevision != 0 { - f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision } - pruneKVs(rr, f) - } if r.MaxCreateRevision != 0 { f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision } pruneKVs(rr, f) @@ -354,16 +350,16 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra sorter = &kvSortByVersion{&kvSort{rr.KVs}} case r.SortTarget == pb.RangeRequest_CREATE: sorter = &kvSortByCreate{&kvSort{rr.KVs}} - case r.SortTarget == pb.RangeRequest_MOD: - sorter = &kvSortByMod{&kvSort{rr.KVs}} case r.SortTarget == pb.RangeRequest_VALUE: sorter = &kvSortByValue{&kvSort{rr.KVs}} } - switch { - case sortOrder == pb.RangeRequest_ASCEND: - sort.Sort(sorter) - case sortOrder == pb.RangeRequest_DESCEND: - sort.Sort(sort.Reverse(sorter)) + if sorter != nil { + switch { + case sortOrder == pb.RangeRequest_ASCEND: + sort.Sort(sorter) + case sortOrder == pb.RangeRequest_DESCEND: + sort.Sort(sort.Reverse(sorter)) + } } } diff --git a/mvcc/kv.go b/mvcc/kv.go index e26d8a63791..5eda09b5451 100644 --- a/mvcc/kv.go +++ b/mvcc/kv.go @@ -21,10 +21,22 @@ import ( "go.etcd.io/etcd/v3/pkg/traceutil" ) +type SortOrder int + +const ( + SortNone SortOrder = iota + SortAscend + SortDescend +) + type RangeOptions struct { - Limit int64 - Rev int64 - Count bool + Limit int64 + Rev int64 + Count bool + MinModRevision int64 + MaxModRevision int64 + SortByModRevision bool + SortOrder SortOrder } type RangeResult struct { diff --git a/mvcc/kvstore_txn.go b/mvcc/kvstore_txn.go index 28e39d0c650..c110a829224 100644 --- a/mvcc/kvstore_txn.go +++ b/mvcc/kvstore_txn.go @@ -15,6 +15,8 @@ package mvcc import ( + "sort" + "go.etcd.io/etcd/v3/lease" "go.etcd.io/etcd/v3/mvcc/backend" "go.etcd.io/etcd/v3/mvcc/mvccpb" @@ -126,12 +128,25 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted } if ro.Count { - total := tr.s.kvindex.CountRevisions(key, end, rev, int(ro.Limit)) + total := tr.s.kvindex.CountRevisions(key, end, rev, rangeLimit(ro)) tr.trace.Step("count revisions from in-memory index tree") return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil } - revpairs := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit)) + + revpairs := tr.s.kvindex.Revisions(key, end, rev, rangeLimit(ro)) tr.trace.Step("range keys from in-memory index tree") + count := len(revpairs) + if ro.SortByModRevision { + sorter := revisions(revpairs) + switch ro.SortOrder { + case SortDescend: + sort.Sort(sort.Reverse(sorter)) + case SortAscend: + sort.Sort(sorter) + } + } + revpairs = pruneRevisions(revpairs, rangeOpToPrunableFuncs(ro)) + if len(revpairs) == 0 { return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil } @@ -161,7 +176,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } } tr.trace.Step("range keys from bolt db") - return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil + return &RangeResult{KVs: kvs, Count: count, Rev: curRev}, nil } func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { @@ -287,3 +302,47 @@ func (tw *storeTxnWrite) delete(key []byte) { } func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes } + +func pruneRevisions(revisions []revision, prunableFuncs []func(revision) bool) (ret []revision) { + if len(prunableFuncs) == 0 { + return revisions + } + + j := 0 + for i := range revisions { + revisions[j] = revisions[i] + prunable := false + for _, f := range prunableFuncs { + if f(revisions[i]) { + prunable = true + break + } + } + if !prunable { + j++ + } + } + return revisions[:j] +} + +func rangeOpToPrunableFuncs(op RangeOptions) (funcs []func(revision) bool) { + if op.MinModRevision != 0 { + funcs = append(funcs, func(r revision) bool { + return r.main < op.MinModRevision + }) + } + if op.MaxModRevision != 0 { + funcs = append(funcs, func(r revision) bool { + return r.main > op.MaxModRevision + }) + } + return +} + +func rangeLimit(op RangeOptions) int { + // if range by modRevision sort, should fetch all revisions from underlying transaction + if op.SortByModRevision { + return 0 + } + return int(op.Limit) +} diff --git a/mvcc/kvstore_txn_test.go b/mvcc/kvstore_txn_test.go new file mode 100644 index 00000000000..c909813f4e7 --- /dev/null +++ b/mvcc/kvstore_txn_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2019 The Pdd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mvcc + +import ( + "reflect" + "testing" +) + +func Test_pruneRevisions(t *testing.T) { + type args struct { + revisions []revision + ro RangeOptions + } + tests := []struct { + name string + args args + wantRet []revision + }{ + { + name: "MaxModRevision", + args: args{ + revisions: []revision{{main: 1}, {main: 2}}, + ro: RangeOptions{MaxModRevision: 1}, + }, + wantRet: []revision{{main: 1}}, + }, + { + name: "MinModRevision And MaxModRevision", + args: args{ + revisions: []revision{{main: 1}, {main: 2}, {main: 32}, {main: 4}}, + ro: RangeOptions{MinModRevision: 1, MaxModRevision: 3}, + }, + wantRet: []revision{{main: 1}, {main: 2}}, + }, + { + name: "MinModRevision", + args: args{ + revisions: []revision{{main: 1}, {main: 2}, {main: 3}}, + ro: RangeOptions{MinModRevision: 3}, + }, + wantRet: []revision{{main: 3}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotRet := pruneRevisions(tt.args.revisions, rangeOpToPrunableFuncs(tt.args.ro)) + t.Log(gotRet) + if !reflect.DeepEqual(gotRet, tt.wantRet) { + t.Errorf("pruneRevisions() = %v, want %v", gotRet, tt.wantRet) + } + }) + } +}