From 26c143bd5f59344a4b8a1e491e0f5e18aa97abc7 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 18 Feb 2021 10:12:25 -0800 Subject: [PATCH] circuit breaking: update picker inline when there's a counter update (#4212) --- xds/internal/balancer/edsbalancer/eds_impl.go | 19 +++++++- .../balancer/edsbalancer/eds_impl_test.go | 45 +++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 8d0cc8898bcf..894b9a0152bc 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -419,12 +419,29 @@ func (edsImpl *edsBalancerImpl) updateServiceRequestsConfig(serviceName string, if !env.CircuitBreakingSupport { return } + edsImpl.pickerMu.Lock() + var updatePicker bool if edsImpl.serviceRequestsCounter == nil || edsImpl.serviceRequestsCounter.ServiceName != serviceName { edsImpl.serviceRequestsCounter = client.GetServiceRequestsCounter(serviceName) + updatePicker = true } + + var newMax uint32 = defaultServiceRequestCountMax if max != nil { - edsImpl.serviceRequestCountMax = *max + newMax = *max + } + if edsImpl.serviceRequestCountMax != newMax { + edsImpl.serviceRequestCountMax = newMax + updatePicker = true + } + if updatePicker && edsImpl.innerState.Picker != nil { + // Update picker with old inner picker, new counter and counterMax. + edsImpl.cc.UpdateState(balancer.State{ + ConnectivityState: edsImpl.innerState.ConnectivityState, + Picker: newDropPicker(edsImpl.innerState.Picker, edsImpl.drops, edsImpl.loadReporter, edsImpl.serviceRequestsCounter, edsImpl.serviceRequestCountMax)}, + ) } + edsImpl.pickerMu.Unlock() } // updateState first handles priority, and then wraps picker in a drop picker diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 7f8a0d3aa82f..3334376f4a9a 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -629,6 +629,51 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { for _, done := range dones { done() } + + // Send another update, with only circuit breaking update (and no picker + // update afterwards). Make sure the new picker uses the new configs. + var maxRequests2 uint32 = 10 + edsb.updateServiceRequestsConfig("test", &maxRequests2) + + // Picks with drops. + dones = []func(){} + p2 := <-cc.NewPickerCh + for i := 0; i < 100; i++ { + pr, err := p2.Pick(balancer.PickInfo{}) + if i < 10 && err != nil { + t.Errorf("The first 10%% picks should be non-drops, got error %v", err) + } else if i > 10 && err == nil { + t.Errorf("The next 90%% picks should be drops, got error ") + } + dones = append(dones, func() { + if pr.Done != nil { + pr.Done(balancer.DoneInfo{}) + } + }) + } + + for _, done := range dones { + done() + } + dones = []func(){} + + // Pick without drops. + for i := 0; i < 10; i++ { + pr, err := p2.Pick(balancer.PickInfo{}) + if err != nil { + t.Errorf("The next 10%% picks should be non-drops, got error %v", err) + } + dones = append(dones, func() { + if pr.Done != nil { + pr.Done(balancer.DoneInfo{}) + } + }) + } + + // Without this, future tests with the same service name will fail. + for _, done := range dones { + done() + } } func init() {