Skip to content

Commit

Permalink
etcdserver: skip range requests in txn if the result is needless
Browse files Browse the repository at this point in the history
If a server isn't serving txn requests from a client, the server
doesn't need the result of range requests in the txn.

This is a succeeding commit of
etcd-io#5689
  • Loading branch information
mitake committed Jul 22, 2016
1 parent ec5c5d9 commit 645d7b6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 19 deletions.
26 changes: 15 additions & 11 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ type applyResult struct {

// applierV3 is the interface for processing V3 raft messages
type applierV3 interface {
Apply(r *pb.InternalRaftRequest) *applyResult
Apply(r *pb.InternalRaftRequest, canSkip bool) *applyResult

Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, error)
Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResponse, error)
DeleteRange(txnID int64, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error)
Txn(rt *pb.TxnRequest, canSkip bool) (*pb.TxnResponse, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error)

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
Expand Down Expand Up @@ -93,7 +93,7 @@ func (s *EtcdServer) newApplierV3() applierV3 {
)
}

func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, canSkip bool) *applyResult {
ar := &applyResult{}

// call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls
Expand All @@ -105,7 +105,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
case r.DeleteRange != nil:
ar.resp, ar.err = a.s.applyV3.DeleteRange(noTxn, r.DeleteRange)
case r.Txn != nil:
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn)
ar.resp, ar.err = a.s.applyV3.Txn(r.Txn, canSkip)
case r.Compaction != nil:
ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction)
case r.LeaseGrant != nil:
Expand Down Expand Up @@ -323,7 +323,7 @@ func (a *applierV3backend) Range(txnID int64, r *pb.RangeRequest) (*pb.RangeResp
return resp, nil
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
func (a *applierV3backend) Txn(rt *pb.TxnRequest, canSkip bool) (*pb.TxnResponse, error) {
ok := true
for _, c := range rt.Compare {
if _, ok = a.applyCompare(c); !ok {
Expand Down Expand Up @@ -363,7 +363,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
if reqs[i].GetRequestRange() == nil {
changedKV = true
}
resps[i] = a.applyUnion(txnID, reqs[i])
resps[i] = a.applyUnion(txnID, reqs[i], canSkip)
}

if changedKV {
Expand Down Expand Up @@ -448,9 +448,13 @@ func (a *applierV3backend) applyCompare(c *pb.Compare) (int64, bool) {
return rev, true
}

func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp) *pb.ResponseOp {
func (a *applierV3backend) applyUnion(txnID int64, union *pb.RequestOp, canSkip bool) *pb.ResponseOp {
switch tv := union.Request.(type) {
case *pb.RequestOp_RequestRange:
if canSkip {
return nil
}

if tv.RequestRange != nil {
resp, err := a.Range(txnID, tv.RequestRange)
if err != nil {
Expand Down Expand Up @@ -574,11 +578,11 @@ func (a *applierV3Capped) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, e
return nil, ErrNoSpace
}

func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) {
func (a *applierV3Capped) Txn(r *pb.TxnRequest, canSkip bool) (*pb.TxnResponse, error) {
if a.q.Cost(r) > 0 {
return nil, ErrNoSpace
}
return a.applierV3.Txn(r)
return a.applierV3.Txn(r, canSkip)
}

func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
Expand Down Expand Up @@ -673,9 +677,9 @@ func (a *quotaApplierV3) Put(txnID int64, p *pb.PutRequest) (*pb.PutResponse, er
return resp, err
}

func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
func (a *quotaApplierV3) Txn(rt *pb.TxnRequest, canSkip bool) (*pb.TxnResponse, error) {
ok := a.q.Available(rt)
resp, err := a.applierV3.Txn(rt)
resp, err := a.applierV3.Txn(rt, canSkip)
if err == nil && !ok {
err = ErrNoSpace
}
Expand Down
8 changes: 4 additions & 4 deletions etcdserver/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newAuthApplierV3(as auth.AuthStore, base applierV3) *authApplierV3 {
return &authApplierV3{applierV3: base, as: as}
}

func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, canSkip bool) *applyResult {
aa.mu.Lock()
defer aa.mu.Unlock()
if r.Header != nil {
Expand All @@ -52,7 +52,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult {
return &applyResult{err: err}
}
}
ret := aa.applierV3.Apply(r)
ret := aa.applierV3.Apply(r, canSkip)
aa.authInfo.Username = ""
aa.authInfo.Revision = 0
return ret
Expand Down Expand Up @@ -135,7 +135,7 @@ func (aa *authApplierV3) checkTxnReqsPermission(reqs []*pb.RequestOp) error {
return nil
}

func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
func (aa *authApplierV3) Txn(rt *pb.TxnRequest, canSkip bool) (*pb.TxnResponse, error) {
for _, c := range rt.Compare {
if err := aa.as.IsRangePermitted(&aa.authInfo, c.Key, nil); err != nil {
return nil, err
Expand All @@ -149,7 +149,7 @@ func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) {
return nil, err
}

return aa.applierV3.Txn(rt)
return aa.applierV3.Txn(rt, canSkip)
}

func needAdminPermission(r *pb.InternalRaftRequest) bool {
Expand Down
5 changes: 3 additions & 2 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,8 +1080,9 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
}

var ar *applyResult
if s.w.IsRegistered(id) || !noSideEffect(&raftReq) {
ar = s.applyV3.Apply(&raftReq)
canSkip := !s.w.IsRegistered(id)
if !canSkip || !noSideEffect(&raftReq) {
ar = s.applyV3.Apply(&raftReq, canSkip)
}

if ar == nil {
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
hdr.AuthRevision = authInfo.Revision
}

result = s.applyV3.Apply(&pb.InternalRaftRequest{Header: hdr, Range: r})
result = s.applyV3.Apply(&pb.InternalRaftRequest{Header: hdr, Range: r}, false)

if result.err != nil {
if result.err == auth.ErrAuthOldRevision {
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
hdr.AuthRevision = authInfo.Revision
}

result = s.applyV3.Apply(&pb.InternalRaftRequest{Header: hdr, Txn: r})
result = s.applyV3.Apply(&pb.InternalRaftRequest{Header: hdr, Txn: r}, false)

if result.err != nil {
if result.err == auth.ErrAuthOldRevision {
Expand Down

0 comments on commit 645d7b6

Please sign in to comment.