From abb20ec51f58c1844e26400136646ec81e64b423 Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Wed, 15 Jun 2016 21:31:07 -0700 Subject: [PATCH] etcdserver, pkg: skip needless log entry applying This commit lets etcdserver skip needless log entry applying. If the result of log applying isn't required by the node (client that issued the request isn't talking with the node) and the operation has no side effects, applying can be skipped. It would contribute to reduce disk I/O on followers and be useful for a cluster that processes much serializable get. --- etcdserver/apply.go | 4 ++++ etcdserver/server.go | 10 +++++++++- pkg/mock/mockwait/wait_recorder.go | 4 ++++ pkg/wait/wait.go | 11 +++++++++++ pkg/wait/wait_test.go | 23 +++++++++++++++++++++++ 5 files changed, 51 insertions(+), 1 deletion(-) diff --git a/etcdserver/apply.go b/etcdserver/apply.go index ed2027c82e2..b2919b9a8a6 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -784,3 +784,7 @@ func compareInt64(a, b int64) int { func isGteRange(rangeEnd []byte) bool { return len(rangeEnd) == 1 && rangeEnd[0] == 0 } + +func noSideEffect(r *pb.InternalRaftRequest) bool { + return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil +} diff --git a/etcdserver/server.go b/etcdserver/server.go index d535843521d..a0d70073262 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1082,8 +1082,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { id = raftReq.Header.ID } - ar := s.applyV3.Apply(&raftReq) + var ar *applyResult + if s.w.IsRegistered(id) || !noSideEffect(&raftReq) { + ar = s.applyV3.Apply(&raftReq) + } s.setAppliedIndex(e.Index) + + if ar == nil { + return + } + if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 { s.w.Trigger(id, ar) return diff --git a/pkg/mock/mockwait/wait_recorder.go b/pkg/mock/mockwait/wait_recorder.go index efc198a19ea..f9c820092f0 100644 --- a/pkg/mock/mockwait/wait_recorder.go +++ b/pkg/mock/mockwait/wait_recorder.go @@ -41,3 +41,7 @@ func (w *waitRecorder) Register(id uint64) <-chan interface{} { func (w *waitRecorder) Trigger(id uint64, x interface{}) { w.Record(testutil.Action{Name: "Trigger"}) } + +func (w *waitRecorder) IsRegistered(id uint64) bool { + panic("waitRecorder.IsRegistered() shouldn't be called") +} diff --git a/pkg/wait/wait.go b/pkg/wait/wait.go index a341c02708d..0f31eeb9790 100644 --- a/pkg/wait/wait.go +++ b/pkg/wait/wait.go @@ -24,6 +24,7 @@ import ( type Wait interface { Register(id uint64) <-chan interface{} Trigger(id uint64, x interface{}) + IsRegistered(id uint64) bool } type List struct { @@ -59,6 +60,13 @@ func (w *List) Trigger(id uint64, x interface{}) { } } +func (w *List) IsRegistered(id uint64) bool { + w.l.Lock() + defer w.l.Unlock() + _, ok := w.m[id] + return ok +} + type waitWithResponse struct { ch <-chan interface{} } @@ -71,3 +79,6 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} { return w.ch } func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} +func (w *waitWithResponse) IsRegistered(id uint64) bool { + panic("waitWithResponse.IsRegistered() shouldn't be called") +} diff --git a/pkg/wait/wait_test.go b/pkg/wait/wait_test.go index f9abec5a169..54395cb360c 100644 --- a/pkg/wait/wait_test.go +++ b/pkg/wait/wait_test.go @@ -77,3 +77,26 @@ func TestTriggerDupSuppression(t *testing.T) { t.Errorf("unexpected non-nil value: %v (%T)", g, g) } } + +func TestIsRegistered(t *testing.T) { + wt := New() + + wt.Register(0) + wt.Register(1) + wt.Register(2) + + for i := uint64(0); i < 3; i++ { + if !wt.IsRegistered(i) { + t.Errorf("event ID %d isn't registered", i) + } + } + + if wt.IsRegistered(4) { + t.Errorf("event ID 4 shouldn't be registered") + } + + wt.Trigger(0, "foo") + if wt.IsRegistered(0) { + t.Errorf("event ID 0 is already triggered, shouldn't be registered") + } +}