Skip to content

Commit

Permalink
[xds_cluster_impl_balancer] delete request counter for circuit breaking
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Feb 4, 2021
1 parent 0a523fe commit b2fc9e1
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 41 deletions.
59 changes: 24 additions & 35 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import (
)

const (
clusterImplName = "xds_cluster_impl_experimental"
defaultRequestCountMax = 1024
clusterImplName = "xds_cluster_impl_experimental"
// TODO: define defaultRequestCountMax = 1024
)

func init() {
Expand All @@ -52,11 +52,10 @@ type clusterImplBB struct{}

func (clusterImplBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &clusterImplBalancer{
ClientConn: cc,
closed: grpcsync.NewEvent(),
loadWrapper: loadstore.NewLoadStoreWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
ClientConn: cc,
closed: grpcsync.NewEvent(),
loadWrapper: loadstore.NewLoadStoreWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
Expand Down Expand Up @@ -107,11 +106,10 @@ type clusterImplBalancer struct {
// childState/drops/requestCounter can only be accessed in run(). And run()
// is the only goroutine that sends picker to the parent ClientConn. All
// requests to update picker need to be sent to pickerUpdateCh.
childState balancer.State
drops []*dropper
requestCounter *xdsclient.ServiceRequestsCounter
requestCountMax uint32
pickerUpdateCh *buffer.Unbounded
childState balancer.State
drops []*dropper
// TODO: add serviceRequestCount and maxRequestCount for circuit breaking.
pickerUpdateCh *buffer.Unbounded
}

// updateLoadStore checks the config for load store, and decides whether it
Expand Down Expand Up @@ -201,27 +199,19 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState
updatePicker = true
}

// Compare cluster name. And update picker if it's changed, because circuit
// breaking's stream counter will be different.
if cib.config == nil || cib.config.Cluster != newConfig.Cluster {
cib.requestCounter = xdsclient.GetServiceRequestsCounter(newConfig.Cluster)
updatePicker = true
}
// Compare upper bound of stream count. And update picker if it's changed.
// This is also for circuit breaking.
var newRequestCountMax uint32 = 1024
if newConfig.MaxConcurrentRequests != nil {
newRequestCountMax = *newConfig.MaxConcurrentRequests
}
if cib.requestCountMax != newRequestCountMax {
cib.requestCountMax = newRequestCountMax
updatePicker = true
}
// TODO: compare cluster name. And update picker if it's changed, because
// circuit breaking's stream counter will be different.
//
// Set `updatePicker` to manually update the picker.

// TODO: compare upper bound of stream count. And update picker if it's
// changed. This is also for circuit breaking.
//
// Set `updatePicker` to manually update the picker.

if updatePicker {
cib.pickerUpdateCh.Put(&dropConfigs{
drops: cib.drops,
requestCounter: cib.requestCounter,
drops: cib.drops,
})
}

Expand Down Expand Up @@ -291,8 +281,7 @@ func (cib *clusterImplBalancer) UpdateState(state balancer.State) {
}

type dropConfigs struct {
drops []*dropper
requestCounter *xdsclient.ServiceRequestsCounter
drops []*dropper
}

func (cib *clusterImplBalancer) run() {
Expand All @@ -305,15 +294,15 @@ func (cib *clusterImplBalancer) run() {
cib.childState = u
cib.ClientConn.UpdateState(balancer.State{
ConnectivityState: cib.childState.ConnectivityState,
Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper, cib.requestCounter),
Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper),
})
case *dropConfigs:
cib.drops = u.drops
cib.requestCounter = u.requestCounter
// cib.requestCounter = u.requestCounter
if cib.childState.Picker != nil {
cib.ClientConn.UpdateState(balancer.State{
ConnectivityState: cib.childState.ConnectivityState,
Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper, cib.requestCounter),
Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper),
})
}
}
Expand Down
8 changes: 2 additions & 6 deletions xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/load"
)

Expand Down Expand Up @@ -73,17 +72,14 @@ type dropPicker struct {
drops []*dropper
s balancer.State
loadStore loadReporter
// FIXME: remove this and add in the next PR.
counter *client.ServiceRequestsCounter
// TODO: add maxRequestCount for circuit breaking.
// TODO: add serviceRequestCount and maxRequestCount for circuit breaking.
}

func newDropPicker(s balancer.State, drops []*dropper, loadStore load.PerClusterReporter, counter *client.ServiceRequestsCounter) *dropPicker {
func newDropPicker(s balancer.State, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker {
return &dropPicker{
drops: drops,
s: s,
loadStore: loadStore,
counter: counter,
}
}

Expand Down

0 comments on commit b2fc9e1

Please sign in to comment.