Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add healthcheck router #389

Merged
merged 10 commits into from
Mar 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions cluster/router/condition/default_health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package condition

import (
"math"
)

import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

const (
HEALTH_CHECKER = "health.checker"
DEFAULT_HEALTH_CHECKER = "default"
OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit"
SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold"
DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5
CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor"
DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5
DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000
MAX_CIRCUIT_TRIPPED_TIMEOUT = 30000
)

func init() {
extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
}

// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of
// the invoker based on the number of successive bad request and the current active request.
type DefaultHealthChecker struct {
// OutStandingRequestConutLimit
OutStandingRequestConutLimit int32
// RequestSuccessiveFailureThreshold
RequestSuccessiveFailureThreshold int32
// RequestSuccessiveFailureThreshold
CircuitTrippedTimeoutFactor int32
}

// IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request
// and the current active request
func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
urlStatus := protocol.GetURLStatus(invoker.GetUrl())
if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.OutStandingRequestConutLimit {
logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key())
return false
}
return true
}

// isCircuitBreakerTripped determine whether the invoker is in the tripped state by the number of successive bad request
func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool {
circuitBreakerTimeout := c.getCircuitBreakerTimeout(status)
currentTime := protocol.CurrentTimeMillis()
if circuitBreakerTimeout <= 0 {
return false
}
return circuitBreakerTimeout > currentTime
}

// getCircuitBreakerTimeout get the timestamp recovered from tripped state
func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 {
sleepWindow := c.getCircuitBreakerSleepWindowTime(status)
if sleepWindow <= 0 {
return 0
}
return status.GetLastRequestFailedTimestamp() + sleepWindow
}

// getCircuitBreakerSleepWindowTime get the sleep window time of invoker
func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 {

successiveFailureCount := status.GetSuccessiveRequestFailureCount()
diff := successiveFailureCount - c.RequestSuccessiveFailureThreshold
if diff < 0 {
return 0
} else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF {
diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF
}
sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR
if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT {
sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT
}
return int64(sleepWindow)
}

// NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url
func NewDefaultHealthChecker(url *common.URL) router.HealthChecker {
return &DefaultHealthChecker{
OutStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)),
RequestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)),
CircuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)),
}
}
138 changes: 138 additions & 0 deletions cluster/router/condition/default_health_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package condition

import (
"math"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)

func TestDefaultHealthChecker_IsHealthy(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
invoker := NewMockInvoker(url, 1)
healthy := hc.IsHealthy(invoker)
assert.True(t, healthy)

url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100")
// fake the outgoing request
for i := 0; i < 11; i++ {
request(url, "test", 0, true, false)
}
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
// the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy
assert.False(t, hc.IsHealthy(invoker))

// successive failed count is more than SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy
for i := 0; i < 11; i++ {
request(url, "test", 0, false, false)
}
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
assert.False(t, hc.IsHealthy(invoker))

// reset successive failed count and go to healthy
request(url, "test", 0, false, true)
healthy = hc.IsHealthy(invoker)
assert.True(t, hc.IsHealthy(invoker))
}

func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
}
sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT)

// Adjust the threshold size to 1000
url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000")
sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == 0)

url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime == 0)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT)
}

func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url))
assert.True(t, timeout == 0)
url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1))
// timeout must after the current time
assert.True(t, timeout > protocol.CurrentTimeMillis())

}

func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
status := protocol.GetURLStatus(url)
tripped := defaultHc.isCircuitBreakerTripped(status)
assert.False(t, tripped)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
}
tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url))
assert.True(t, tripped)

}

func TestNewDefaultHealthChecker(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
assert.NotNil(t, defaultHc)
assert.Equal(t, defaultHc.OutStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, defaultHc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF))

url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker)
assert.NotNil(t, nondefaultHc)
assert.Equal(t, nondefaultHc.OutStandingRequestConutLimit, int32(10))
assert.Equal(t, nondefaultHc.RequestSuccessiveFailureThreshold, int32(10))
}

func request(url common.URL, method string, elapsed int64, active, succeeded bool) {
protocol.BeginCount(url, method)
if !active {
protocol.EndCount(url, method, elapsed, succeeded)
}
}
12 changes: 12 additions & 0 deletions cluster/router/condition/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
func init() {
extension.SetRouterFactory(constant.ConditionRouterName, newConditionRouterFactory)
extension.SetRouterFactory(constant.ConditionAppRouterName, newAppRouterFactory)
extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory)
}

// ConditionRouterFactory Condition router factory
Expand Down Expand Up @@ -57,3 +58,14 @@ func newAppRouterFactory() router.RouterFactory {
func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) {
return NewAppRouter(url)
}

type HealthCheckRouteFactory struct {
}

func newHealthCheckRouteFactory() router.RouterFactory {
return &HealthCheckRouteFactory{}
}

func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) {
return NewHealthCheckRouter(url)
}
5 changes: 5 additions & 0 deletions cluster/router/condition/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,8 @@ func TestNewAppRouterFactory(t *testing.T) {
factory := newAppRouterFactory()
assert.NotNil(t, factory)
}

func TestHealthCheckRouteFactory(t *testing.T) {
factory := newHealthCheckRouteFactory()
assert.NotNil(t, factory)
}
68 changes: 68 additions & 0 deletions cluster/router/condition/health_check_route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package condition

import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

const (
HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled"
)

// HealthCheckRouter provides a health-first routing mechanism through HealthChecker
type HealthCheckRouter struct {
url *common.URL
enabled bool
checker router.HealthChecker
}

// NewHealthCheckRouter construct an HealthCheckRouter via url
func NewHealthCheckRouter(url *common.URL) (router.Router, error) {
r := &HealthCheckRouter{}
r.url = url
r.enabled = url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false)
if r.enabled {
checkerName := url.GetParam(HEALTH_CHECKER, DEFAULT_HEALTH_CHECKER)
r.checker = extension.GetHealthChecker(checkerName, url)
}
return r, nil
}

// Route gets a list of healthy invoker
func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if !r.enabled {
return invokers
}
var healthyInvokers []protocol.Invoker
// Add healthy invoker to the list
for _, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
healthyInvokers = append(healthyInvokers, invoker)
}
}
// If all Invoke are considered unhealthy, downgrade to all inovker
if len(healthyInvokers) == 0 {
logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey())
return invokers
} else {
return healthyInvokers
}
}

// Priority
func (r *HealthCheckRouter) Priority() int64 {
return 0
}

// URL Return URL in router
func (r *HealthCheckRouter) URL() common.URL {
return *r.url
}

// HealthyChecker returns the HealthChecker bound to this HealthCheckRouter
func (r *HealthCheckRouter) HealthyChecker() router.HealthChecker {
return r.checker
}
Loading