From e9556a45379ef1da12e54847edb2fb3d7d566f36 Mon Sep 17 00:00:00 2001 From: Arpad Ryszka Date: Tue, 30 May 2017 05:14:23 +0200 Subject: [PATCH] two step circuit breaker (#6) * add multi step breaker control * added documentation * test breaker initialization with preserving coverage * change the 2-step breaker interface to a single Allow function with a callback * provide a separate structure for the different use case of Allow() * update documentation * review fixes: - renamed twostep breaker constructor - fixed failure test call to return an error --- gobreaker.go | 41 ++++++++++++++-- gobreaker_test.go | 122 ++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 140 insertions(+), 23 deletions(-) diff --git a/gobreaker.go b/gobreaker.go index ce960ed..bc958cf 100644 --- a/gobreaker.go +++ b/gobreaker.go @@ -115,6 +115,13 @@ type CircuitBreaker struct { expiry time.Time } +// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function +// with the breaker functionality, it only checks whether a request can proceed and +// expects the caller to report the outcome in a separate step using a callback. +type TwoStepCircuitBreaker struct { + cb *CircuitBreaker +} + // NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings. func NewCircuitBreaker(st Settings) *CircuitBreaker { cb := new(CircuitBreaker) @@ -146,6 +153,13 @@ func NewCircuitBreaker(st Settings) *CircuitBreaker { return cb } +// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings. +func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker { + return &TwoStepCircuitBreaker{ + cb: NewCircuitBreaker(st), + } +} + const defaultTimeout = time.Duration(60) * time.Second func defaultReadyToTrip(counts Counts) bool { @@ -176,16 +190,35 @@ func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, defer func() { e := recover() if e != nil { - cb.afterRequest(generation, fmt.Errorf("panic in request")) + cb.afterRequest(generation, false) panic(e) } }() result, err := req() - cb.afterRequest(generation, err) + cb.afterRequest(generation, err == nil) return result, err } +// Allow checks if a new request can proceed. It returns a callback that should be used to +// register the success or failure in a separate step. If the Circuit Breaker doesn't allow +// requests it returns an error. +func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) { + generation, err := tscb.cb.beforeRequest() + if err != nil { + return nil, err + } + + return func(success bool) { + tscb.cb.afterRequest(generation, success) + }, nil +} + +// State returns the current state of the TwoStepCircuitBreaker. +func (tscb *TwoStepCircuitBreaker) State() State { + return tscb.cb.State() +} + func (cb *CircuitBreaker) beforeRequest() (uint64, error) { cb.mutex.Lock() defer cb.mutex.Unlock() @@ -203,7 +236,7 @@ func (cb *CircuitBreaker) beforeRequest() (uint64, error) { return generation, nil } -func (cb *CircuitBreaker) afterRequest(before uint64, err error) { +func (cb *CircuitBreaker) afterRequest(before uint64, success bool) { cb.mutex.Lock() defer cb.mutex.Unlock() @@ -213,7 +246,7 @@ func (cb *CircuitBreaker) afterRequest(before uint64, err error) { return } - if err == nil { + if success { cb.onSuccess(state, now) } else { cb.onFailure(state, now) diff --git a/gobreaker_test.go b/gobreaker_test.go index 8373479..68655f2 100644 --- a/gobreaker_test.go +++ b/gobreaker_test.go @@ -43,6 +43,16 @@ func succeedLater(cb *CircuitBreaker, delay time.Duration) <-chan error { return ch } +func succeed2Step(cb *TwoStepCircuitBreaker) error { + done, err := cb.Allow() + if err != nil { + return err + } + + done(true) + return nil +} + func fail(cb *CircuitBreaker) error { msg := "fail" _, err := cb.Execute(func() (interface{}, error) { return nil, fmt.Errorf(msg) }) @@ -52,11 +62,47 @@ func fail(cb *CircuitBreaker) error { return err } +func fail2Step(cb *TwoStepCircuitBreaker) error { + done, err := cb.Allow() + if err != nil { + return err + } + + done(false) + return nil +} + func causePanic(cb *CircuitBreaker) error { _, err := cb.Execute(func() (interface{}, error) { panic("oops"); return nil, nil }) return err } +func newCustom() *CircuitBreaker { + var customSt Settings + customSt.Name = "cb" + customSt.MaxRequests = 3 + customSt.Interval = time.Duration(30) * time.Second + customSt.Timeout = time.Duration(90) * time.Second + customSt.ReadyToTrip = func(counts Counts) bool { + numReqs := counts.Requests + failureRatio := float64(counts.TotalFailures) / float64(numReqs) + + counts.clear() // no effect on customCB.counts + + return numReqs >= 3 && failureRatio >= 0.6 + } + customSt.OnStateChange = func(name string, from State, to State) { + stateChange = StateChange{name, from, to} + } + + return NewCircuitBreaker(customSt) +} + +func init() { + defaultCB = NewCircuitBreaker(Settings{}) + customCB = newCustom() +} + func TestStateConstants(t *testing.T) { assert.Equal(t, State(0), StateClosed) assert.Equal(t, State(1), StateHalfOpen) @@ -69,8 +115,7 @@ func TestStateConstants(t *testing.T) { } func TestNewCircuitBreaker(t *testing.T) { - var defaultSt Settings - defaultCB = NewCircuitBreaker(defaultSt) + defaultCB := NewCircuitBreaker(Settings{}) assert.Equal(t, "", defaultCB.name) assert.Equal(t, uint32(1), defaultCB.maxRequests) assert.Equal(t, time.Duration(0), defaultCB.interval) @@ -81,23 +126,7 @@ func TestNewCircuitBreaker(t *testing.T) { assert.Equal(t, Counts{0, 0, 0, 0, 0}, defaultCB.counts) assert.True(t, defaultCB.expiry.IsZero()) - var customSt Settings - customSt.Name = "cb" - customSt.MaxRequests = 3 - customSt.Interval = time.Duration(30) * time.Second - customSt.Timeout = time.Duration(90) * time.Second - customSt.ReadyToTrip = func(counts Counts) bool { - numReqs := counts.Requests - failureRatio := float64(counts.TotalFailures) / float64(numReqs) - - counts.clear() // no effect on customCB.counts - - return numReqs >= 3 && failureRatio >= 0.6 - } - customSt.OnStateChange = func(name string, from State, to State) { - stateChange = StateChange{name, from, to} - } - customCB = NewCircuitBreaker(customSt) + customCB := newCustom() assert.Equal(t, "cb", customCB.name) assert.Equal(t, uint32(3), customCB.maxRequests) assert.Equal(t, time.Duration(30)*time.Second, customCB.interval) @@ -211,6 +240,61 @@ func TestCustomCircuitBreaker(t *testing.T) { assert.Equal(t, StateChange{"cb", StateHalfOpen, StateClosed}, stateChange) } +func TestTwoStepCircuitBreaker(t *testing.T) { + tscb := NewTwoStepCircuitBreaker(Settings{}) + for i := 0; i < 5; i++ { + assert.Nil(t, fail2Step(tscb)) + } + + assert.Equal(t, StateClosed, tscb.State()) + assert.Equal(t, Counts{5, 0, 5, 0, 5}, tscb.cb.counts) + + assert.Nil(t, succeed2Step(tscb)) + assert.Equal(t, StateClosed, tscb.State()) + assert.Equal(t, Counts{6, 1, 5, 1, 0}, tscb.cb.counts) + + assert.Nil(t, fail2Step(tscb)) + assert.Equal(t, StateClosed, tscb.State()) + assert.Equal(t, Counts{7, 1, 6, 0, 1}, tscb.cb.counts) + + // StateClosed to StateOpen + for i := 0; i < 5; i++ { + assert.Nil(t, fail2Step(tscb)) // 6 consecutive failures + } + assert.Equal(t, StateOpen, tscb.State()) + assert.Equal(t, Counts{0, 0, 0, 0, 0}, tscb.cb.counts) + assert.False(t, tscb.cb.expiry.IsZero()) + + assert.Error(t, succeed2Step(tscb)) + assert.Error(t, fail2Step(tscb)) + assert.Equal(t, Counts{0, 0, 0, 0, 0}, tscb.cb.counts) + + pseudoSleep(tscb.cb, time.Duration(59)*time.Second) + assert.Equal(t, StateOpen, tscb.State()) + + // StateOpen to StateHalfOpen + pseudoSleep(tscb.cb, time.Duration(1)*time.Second) // over Timeout + assert.Equal(t, StateHalfOpen, tscb.State()) + assert.True(t, tscb.cb.expiry.IsZero()) + + // StateHalfOpen to StateOpen + assert.Nil(t, fail2Step(tscb)) + assert.Equal(t, StateOpen, tscb.State()) + assert.Equal(t, Counts{0, 0, 0, 0, 0}, tscb.cb.counts) + assert.False(t, tscb.cb.expiry.IsZero()) + + // StateOpen to StateHalfOpen + pseudoSleep(tscb.cb, time.Duration(60)*time.Second) + assert.Equal(t, StateHalfOpen, tscb.State()) + assert.True(t, tscb.cb.expiry.IsZero()) + + // StateHalfOpen to StateClosed + assert.Nil(t, succeed2Step(tscb)) + assert.Equal(t, StateClosed, tscb.State()) + assert.Equal(t, Counts{0, 0, 0, 0, 0}, tscb.cb.counts) + assert.True(t, tscb.cb.expiry.IsZero()) +} + func TestPanicInRequest(t *testing.T) { assert.Panics(t, func() { causePanic(defaultCB) }) assert.Equal(t, Counts{1, 0, 1, 0, 1}, defaultCB.counts)