From b2fc9e193b87c73ff8e9893838557c3e7be4c1db Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 4 Feb 2021 13:27:22 -0800 Subject: [PATCH] [xds_cluster_impl_balancer] delete request counter for circuit breaking --- .../balancer/clusterimpl/clusterimpl.go | 59 ++++++++----------- xds/internal/balancer/clusterimpl/picker.go | 8 +-- 2 files changed, 26 insertions(+), 41 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 7e8d52a3eae0..df571fab1ed2 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -38,8 +38,8 @@ import ( ) const ( - clusterImplName = "xds_cluster_impl_experimental" - defaultRequestCountMax = 1024 + clusterImplName = "xds_cluster_impl_experimental" + // TODO: define defaultRequestCountMax = 1024 ) func init() { @@ -52,11 +52,10 @@ type clusterImplBB struct{} func (clusterImplBB) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer { b := &clusterImplBalancer{ - ClientConn: cc, - closed: grpcsync.NewEvent(), - loadWrapper: loadstore.NewLoadStoreWrapper(), - pickerUpdateCh: buffer.NewUnbounded(), - requestCountMax: defaultRequestCountMax, + ClientConn: cc, + closed: grpcsync.NewEvent(), + loadWrapper: loadstore.NewLoadStoreWrapper(), + pickerUpdateCh: buffer.NewUnbounded(), } b.logger = prefixLogger(b) b.logger.Infof("Created") @@ -107,11 +106,10 @@ type clusterImplBalancer struct { // childState/drops/requestCounter can only be accessed in run(). And run() // is the only goroutine that sends picker to the parent ClientConn. All // requests to update picker need to be sent to pickerUpdateCh. - childState balancer.State - drops []*dropper - requestCounter *xdsclient.ServiceRequestsCounter - requestCountMax uint32 - pickerUpdateCh *buffer.Unbounded + childState balancer.State + drops []*dropper + // TODO: add serviceRequestCount and maxRequestCount for circuit breaking. + pickerUpdateCh *buffer.Unbounded } // updateLoadStore checks the config for load store, and decides whether it @@ -201,27 +199,19 @@ func (cib *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState updatePicker = true } - // Compare cluster name. And update picker if it's changed, because circuit - // breaking's stream counter will be different. - if cib.config == nil || cib.config.Cluster != newConfig.Cluster { - cib.requestCounter = xdsclient.GetServiceRequestsCounter(newConfig.Cluster) - updatePicker = true - } - // Compare upper bound of stream count. And update picker if it's changed. - // This is also for circuit breaking. - var newRequestCountMax uint32 = 1024 - if newConfig.MaxConcurrentRequests != nil { - newRequestCountMax = *newConfig.MaxConcurrentRequests - } - if cib.requestCountMax != newRequestCountMax { - cib.requestCountMax = newRequestCountMax - updatePicker = true - } + // TODO: compare cluster name. And update picker if it's changed, because + // circuit breaking's stream counter will be different. + // + // Set `updatePicker` to manually update the picker. + + // TODO: compare upper bound of stream count. And update picker if it's + // changed. This is also for circuit breaking. + // + // Set `updatePicker` to manually update the picker. if updatePicker { cib.pickerUpdateCh.Put(&dropConfigs{ - drops: cib.drops, - requestCounter: cib.requestCounter, + drops: cib.drops, }) } @@ -291,8 +281,7 @@ func (cib *clusterImplBalancer) UpdateState(state balancer.State) { } type dropConfigs struct { - drops []*dropper - requestCounter *xdsclient.ServiceRequestsCounter + drops []*dropper } func (cib *clusterImplBalancer) run() { @@ -305,15 +294,15 @@ func (cib *clusterImplBalancer) run() { cib.childState = u cib.ClientConn.UpdateState(balancer.State{ ConnectivityState: cib.childState.ConnectivityState, - Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper, cib.requestCounter), + Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper), }) case *dropConfigs: cib.drops = u.drops - cib.requestCounter = u.requestCounter + // cib.requestCounter = u.requestCounter if cib.childState.Picker != nil { cib.ClientConn.UpdateState(balancer.State{ ConnectivityState: cib.childState.ConnectivityState, - Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper, cib.requestCounter), + Picker: newDropPicker(cib.childState, cib.drops, cib.loadWrapper), }) } } diff --git a/xds/internal/balancer/clusterimpl/picker.go b/xds/internal/balancer/clusterimpl/picker.go index e8993c6675c4..05e9f89786fd 100644 --- a/xds/internal/balancer/clusterimpl/picker.go +++ b/xds/internal/balancer/clusterimpl/picker.go @@ -24,7 +24,6 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/status" - "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" ) @@ -73,17 +72,14 @@ type dropPicker struct { drops []*dropper s balancer.State loadStore loadReporter - // FIXME: remove this and add in the next PR. - counter *client.ServiceRequestsCounter - // TODO: add maxRequestCount for circuit breaking. + // TODO: add serviceRequestCount and maxRequestCount for circuit breaking. } -func newDropPicker(s balancer.State, drops []*dropper, loadStore load.PerClusterReporter, counter *client.ServiceRequestsCounter) *dropPicker { +func newDropPicker(s balancer.State, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker { return &dropPicker{ drops: drops, s: s, loadStore: loadStore, - counter: counter, } }